Merge branch 'features/validators-event' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 @object_types ~w[ChatMessage Question Answer Audio Event]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
92 {:ok, object, meta}
93 end
94 end
95
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
99 {:ok, activity} <-
100 Repo.insert(%Activity{
101 data: object,
102 local: local,
103 recipients: recipients,
104 actor: object["actor"]
105 }) do
106 {:ok, activity, meta}
107 end
108 end
109
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
121 {:ok, activity} =
122 %Activity{
123 data: map,
124 local: local,
125 actor: map["actor"],
126 recipients: recipients
127 }
128 |> Repo.insert()
129 |> maybe_create_activity_expiration()
130
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
133
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
135
136 {:ok, activity}
137 else
138 %Activity{} = activity ->
139 {:ok, activity}
140
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
143 data: map,
144 local: local,
145 actor: map["actor"],
146 recipients: recipients,
147 id: "pleroma:fakeid"
148 }
149
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
151 {:ok, activity}
152
153 error ->
154 {:error, error}
155 end
156 end
157
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
160
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
163 stream_out(activity)
164 stream_out_participations(participations)
165 end
166
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
169 {:ok, activity}
170 end
171 end
172
173 defp maybe_create_activity_expiration(result), do: result
174
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
179 {:ok, conversation}
180 end
181 end
182
183 defp get_participations({:ok, conversation}) do
184 conversation
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
187 end
188
189 defp get_participations(_), do: []
190
191 def stream_out_participations(participations) do
192 participations =
193 participations
194 |> Repo.preload(:user)
195
196 Streamer.stream("participation", participations)
197 end
198
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
202
203 last_activity_id =
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
205 user: user,
206 blocking_user: user
207 })
208
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
211 end
212 end
213 end
214
215 def stream_out_participations(_, _), do: :noop
216
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
219 activity
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
222 end
223
224 def stream_out(_activity) do
225 :noop
226 end
227
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
231 result
232 end
233 end
234
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
241
242 create_data =
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 )
247
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:quick_insert, true, activity} ->
258 {:ok, activity}
259
260 {:fake, true, activity} ->
261 {:ok, activity}
262
263 {:error, message} ->
264 Repo.rollback(message)
265 end
266 end
267
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274
275 listen_data =
276 make_listen_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 end
286 end
287
288 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
289 {:ok, Activity.t()} | nil | {:error, any()}
290 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
291 with {:ok, result} <-
292 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
293 result
294 end
295 end
296
297 defp do_unfollow(follower, followed, activity_id, local) do
298 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
299 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
300 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
301 {:ok, activity} <- insert(unfollow_data, local),
302 _ <- notify_and_stream(activity),
303 :ok <- maybe_federate(activity) do
304 {:ok, activity}
305 else
306 nil -> nil
307 {:error, error} -> Repo.rollback(error)
308 end
309 end
310
311 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
312 def flag(
313 %{
314 actor: actor,
315 context: _context,
316 account: account,
317 statuses: statuses,
318 content: content
319 } = params
320 ) do
321 # only accept false as false value
322 local = !(params[:local] == false)
323 forward = !(params[:forward] == false)
324
325 additional = params[:additional] || %{}
326
327 additional =
328 if forward do
329 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
330 else
331 Map.merge(additional, %{"to" => [], "cc" => []})
332 end
333
334 with flag_data <- make_flag_data(params, additional),
335 {:ok, activity} <- insert(flag_data, local),
336 {:ok, stripped_activity} <- strip_report_status_data(activity),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(stripped_activity) do
339 User.all_superusers()
340 |> Enum.filter(fn user -> not is_nil(user.email) end)
341 |> Enum.each(fn superuser ->
342 superuser
343 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
344 |> Pleroma.Emails.Mailer.deliver_async()
345 end)
346
347 {:ok, activity}
348 end
349 end
350
351 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
352 def move(%User{} = origin, %User{} = target, local \\ true) do
353 params = %{
354 "type" => "Move",
355 "actor" => origin.ap_id,
356 "object" => origin.ap_id,
357 "target" => target.ap_id
358 }
359
360 with true <- origin.ap_id in target.also_known_as,
361 {:ok, activity} <- insert(params, local),
362 _ <- notify_and_stream(activity) do
363 maybe_federate(activity)
364
365 BackgroundWorker.enqueue("move_following", %{
366 "origin_id" => origin.id,
367 "target_id" => target.id
368 })
369
370 {:ok, activity}
371 else
372 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
373 err -> err
374 end
375 end
376
377 def fetch_activities_for_context_query(context, opts) do
378 public = [Constants.as_public()]
379
380 recipients =
381 if opts[:user],
382 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
383 else: public
384
385 from(activity in Activity)
386 |> maybe_preload_objects(opts)
387 |> maybe_preload_bookmarks(opts)
388 |> maybe_set_thread_muted_field(opts)
389 |> restrict_blocked(opts)
390 |> restrict_recipients(recipients, opts[:user])
391 |> restrict_filtered(opts)
392 |> where(
393 [activity],
394 fragment(
395 "?->>'type' = ? and ?->>'context' = ?",
396 activity.data,
397 "Create",
398 activity.data,
399 ^context
400 )
401 )
402 |> exclude_poll_votes(opts)
403 |> exclude_id(opts)
404 |> order_by([activity], desc: activity.id)
405 end
406
407 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
408 def fetch_activities_for_context(context, opts \\ %{}) do
409 context
410 |> fetch_activities_for_context_query(opts)
411 |> Repo.all()
412 end
413
414 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
415 FlakeId.Ecto.CompatType.t() | nil
416 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
417 context
418 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
419 |> restrict_visibility(%{visibility: "direct"})
420 |> limit(1)
421 |> select([a], a.id)
422 |> Repo.one()
423 end
424
425 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
426 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
427 opts = Map.delete(opts, :user)
428
429 [Constants.as_public()]
430 |> fetch_activities_query(opts)
431 |> restrict_unlisted(opts)
432 |> Pagination.fetch_paginated(opts, pagination)
433 end
434
435 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
436 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
437 opts
438 |> Map.put(:restrict_unlisted, true)
439 |> fetch_public_or_unlisted_activities(pagination)
440 end
441
442 @valid_visibilities ~w[direct unlisted public private]
443
444 defp restrict_visibility(query, %{visibility: visibility})
445 when is_list(visibility) do
446 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
447 from(
448 a in query,
449 where:
450 fragment(
451 "activity_visibility(?, ?, ?) = ANY (?)",
452 a.actor,
453 a.recipients,
454 a.data,
455 ^visibility
456 )
457 )
458 else
459 Logger.error("Could not restrict visibility to #{visibility}")
460 end
461 end
462
463 defp restrict_visibility(query, %{visibility: visibility})
464 when visibility in @valid_visibilities do
465 from(
466 a in query,
467 where:
468 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
469 )
470 end
471
472 defp restrict_visibility(_query, %{visibility: visibility})
473 when visibility not in @valid_visibilities do
474 Logger.error("Could not restrict visibility to #{visibility}")
475 end
476
477 defp restrict_visibility(query, _visibility), do: query
478
479 defp exclude_visibility(query, %{exclude_visibilities: visibility})
480 when is_list(visibility) do
481 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
482 from(
483 a in query,
484 where:
485 not fragment(
486 "activity_visibility(?, ?, ?) = ANY (?)",
487 a.actor,
488 a.recipients,
489 a.data,
490 ^visibility
491 )
492 )
493 else
494 Logger.error("Could not exclude visibility to #{visibility}")
495 query
496 end
497 end
498
499 defp exclude_visibility(query, %{exclude_visibilities: visibility})
500 when visibility in @valid_visibilities do
501 from(
502 a in query,
503 where:
504 not fragment(
505 "activity_visibility(?, ?, ?) = ?",
506 a.actor,
507 a.recipients,
508 a.data,
509 ^visibility
510 )
511 )
512 end
513
514 defp exclude_visibility(query, %{exclude_visibilities: visibility})
515 when visibility not in [nil | @valid_visibilities] do
516 Logger.error("Could not exclude visibility to #{visibility}")
517 query
518 end
519
520 defp exclude_visibility(query, _visibility), do: query
521
522 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
523 do: query
524
525 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
526 do: query
527
528 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
529 from(
530 a in query,
531 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
532 )
533 end
534
535 defp restrict_thread_visibility(query, _, _), do: query
536
537 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
538 params =
539 params
540 |> Map.put(:user, reading_user)
541 |> Map.put(:actor_id, user.ap_id)
542
543 %{
544 godmode: params[:godmode],
545 reading_user: reading_user
546 }
547 |> user_activities_recipients()
548 |> fetch_activities(params)
549 |> Enum.reverse()
550 end
551
552 def fetch_user_activities(user, reading_user, params \\ %{}) do
553 params =
554 params
555 |> Map.put(:type, ["Create", "Announce"])
556 |> Map.put(:user, reading_user)
557 |> Map.put(:actor_id, user.ap_id)
558 |> Map.put(:pinned_activity_ids, user.pinned_activities)
559
560 params =
561 if User.blocks?(reading_user, user) do
562 params
563 else
564 params
565 |> Map.put(:blocking_user, reading_user)
566 |> Map.put(:muting_user, reading_user)
567 end
568
569 %{
570 godmode: params[:godmode],
571 reading_user: reading_user
572 }
573 |> user_activities_recipients()
574 |> fetch_activities(params)
575 |> Enum.reverse()
576 end
577
578 def fetch_statuses(reading_user, params) do
579 params = Map.put(params, :type, ["Create", "Announce"])
580
581 %{
582 godmode: params[:godmode],
583 reading_user: reading_user
584 }
585 |> user_activities_recipients()
586 |> fetch_activities(params, :offset)
587 |> Enum.reverse()
588 end
589
590 defp user_activities_recipients(%{godmode: true}), do: []
591
592 defp user_activities_recipients(%{reading_user: reading_user}) do
593 if reading_user do
594 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
595 else
596 [Constants.as_public()]
597 end
598 end
599
600 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
601 raise "Can't use the child object without preloading!"
602 end
603
604 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
605 from(
606 [activity, object] in query,
607 where:
608 fragment(
609 "?->>'type' != ? or ?->>'actor' != ?",
610 activity.data,
611 "Announce",
612 object.data,
613 ^actor
614 )
615 )
616 end
617
618 defp restrict_announce_object_actor(query, _), do: query
619
620 defp restrict_since(query, %{since_id: ""}), do: query
621
622 defp restrict_since(query, %{since_id: since_id}) do
623 from(activity in query, where: activity.id > ^since_id)
624 end
625
626 defp restrict_since(query, _), do: query
627
628 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
629 raise "Can't use the child object without preloading!"
630 end
631
632 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
633 from(
634 [_activity, object] in query,
635 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
636 )
637 end
638
639 defp restrict_tag_reject(query, _), do: query
640
641 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
642 raise "Can't use the child object without preloading!"
643 end
644
645 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
646 from(
647 [_activity, object] in query,
648 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
649 )
650 end
651
652 defp restrict_tag_all(query, _), do: query
653
654 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
655 raise "Can't use the child object without preloading!"
656 end
657
658 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
659 from(
660 [_activity, object] in query,
661 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
662 )
663 end
664
665 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
666 from(
667 [_activity, object] in query,
668 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
669 )
670 end
671
672 defp restrict_tag(query, _), do: query
673
674 defp restrict_recipients(query, [], _user), do: query
675
676 defp restrict_recipients(query, recipients, nil) do
677 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
678 end
679
680 defp restrict_recipients(query, recipients, user) do
681 from(
682 activity in query,
683 where: fragment("? && ?", ^recipients, activity.recipients),
684 or_where: activity.actor == ^user.ap_id
685 )
686 end
687
688 defp restrict_local(query, %{local_only: true}) do
689 from(activity in query, where: activity.local == true)
690 end
691
692 defp restrict_local(query, _), do: query
693
694 defp restrict_actor(query, %{actor_id: actor_id}) do
695 from(activity in query, where: activity.actor == ^actor_id)
696 end
697
698 defp restrict_actor(query, _), do: query
699
700 defp restrict_type(query, %{type: type}) when is_binary(type) do
701 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
702 end
703
704 defp restrict_type(query, %{type: type}) do
705 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
706 end
707
708 defp restrict_type(query, _), do: query
709
710 defp restrict_state(query, %{state: state}) do
711 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
712 end
713
714 defp restrict_state(query, _), do: query
715
716 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
717 from(
718 [_activity, object] in query,
719 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
720 )
721 end
722
723 defp restrict_favorited_by(query, _), do: query
724
725 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
727 end
728
729 defp restrict_media(query, %{only_media: true}) do
730 from(
731 [activity, object] in query,
732 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
733 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
734 )
735 end
736
737 defp restrict_media(query, _), do: query
738
739 defp restrict_replies(query, %{exclude_replies: true}) do
740 from(
741 [_activity, object] in query,
742 where: fragment("?->>'inReplyTo' is null", object.data)
743 )
744 end
745
746 defp restrict_replies(query, %{
747 reply_filtering_user: user,
748 reply_visibility: "self"
749 }) do
750 from(
751 [activity, object] in query,
752 where:
753 fragment(
754 "?->>'inReplyTo' is null OR ? = ANY(?)",
755 object.data,
756 ^user.ap_id,
757 activity.recipients
758 )
759 )
760 end
761
762 defp restrict_replies(query, %{
763 reply_filtering_user: user,
764 reply_visibility: "following"
765 }) do
766 from(
767 [activity, object] in query,
768 where:
769 fragment(
770 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
771 object.data,
772 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
773 activity.recipients,
774 activity.actor,
775 activity.actor,
776 ^user.ap_id
777 )
778 )
779 end
780
781 defp restrict_replies(query, _), do: query
782
783 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
784 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
785 end
786
787 defp restrict_reblogs(query, _), do: query
788
789 defp restrict_muted(query, %{with_muted: true}), do: query
790
791 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
792 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
793
794 query =
795 from([activity] in query,
796 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
797 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
798 )
799
800 unless opts[:skip_preload] do
801 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
802 else
803 query
804 end
805 end
806
807 defp restrict_muted(query, _), do: query
808
809 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
810 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
811 domain_blocks = user.domain_blocks || []
812
813 following_ap_ids = User.get_friends_ap_ids(user)
814
815 query =
816 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
817
818 from(
819 [activity, object: o] in query,
820 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
821 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
822 where:
823 fragment(
824 "recipients_contain_blocked_domains(?, ?) = false",
825 activity.recipients,
826 ^domain_blocks
827 ),
828 where:
829 fragment(
830 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
831 activity.data,
832 activity.data,
833 ^blocked_ap_ids
834 ),
835 where:
836 fragment(
837 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
838 activity.actor,
839 ^domain_blocks,
840 activity.actor,
841 ^following_ap_ids
842 ),
843 where:
844 fragment(
845 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
846 o.data,
847 ^domain_blocks,
848 o.data,
849 ^following_ap_ids
850 )
851 )
852 end
853
854 defp restrict_blocked(query, _), do: query
855
856 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
857 from(
858 activity in query,
859 where:
860 fragment(
861 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
862 activity.data,
863 ^[Constants.as_public()]
864 )
865 )
866 end
867
868 defp restrict_unlisted(query, _), do: query
869
870 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
871 from(activity in query, where: activity.id in ^ids)
872 end
873
874 defp restrict_pinned(query, _), do: query
875
876 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
877 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
878
879 from(
880 activity in query,
881 where:
882 fragment(
883 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
884 activity.data,
885 activity.actor,
886 ^muted_reblogs
887 )
888 )
889 end
890
891 defp restrict_muted_reblogs(query, _), do: query
892
893 defp restrict_instance(query, %{instance: instance}) do
894 users =
895 from(
896 u in User,
897 select: u.ap_id,
898 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
899 )
900 |> Repo.all()
901
902 from(activity in query, where: activity.actor in ^users)
903 end
904
905 defp restrict_instance(query, _), do: query
906
907 defp restrict_filtered(query, %{user: %User{} = user}) do
908 case Filter.compose_regex(user) do
909 nil ->
910 query
911
912 regex ->
913 from([activity, object] in query,
914 where:
915 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
916 activity.actor == ^user.ap_id
917 )
918 end
919 end
920
921 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
922 restrict_filtered(query, %{user: user})
923 end
924
925 defp restrict_filtered(query, _), do: query
926
927 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
928
929 defp exclude_poll_votes(query, _) do
930 if has_named_binding?(query, :object) do
931 from([activity, object: o] in query,
932 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
933 )
934 else
935 query
936 end
937 end
938
939 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
940
941 defp exclude_chat_messages(query, _) do
942 if has_named_binding?(query, :object) do
943 from([activity, object: o] in query,
944 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
945 )
946 else
947 query
948 end
949 end
950
951 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
952
953 defp exclude_invisible_actors(query, _opts) do
954 invisible_ap_ids =
955 User.Query.build(%{invisible: true, select: [:ap_id]})
956 |> Repo.all()
957 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
958
959 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
960 end
961
962 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
963 from(activity in query, where: activity.id != ^id)
964 end
965
966 defp exclude_id(query, _), do: query
967
968 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
969
970 defp maybe_preload_objects(query, _) do
971 query
972 |> Activity.with_preloaded_object()
973 end
974
975 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
976
977 defp maybe_preload_bookmarks(query, opts) do
978 query
979 |> Activity.with_preloaded_bookmark(opts[:user])
980 end
981
982 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
983 query
984 |> Activity.with_preloaded_report_notes()
985 end
986
987 defp maybe_preload_report_notes(query, _), do: query
988
989 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
990
991 defp maybe_set_thread_muted_field(query, opts) do
992 query
993 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
994 end
995
996 defp maybe_order(query, %{order: :desc}) do
997 query
998 |> order_by(desc: :id)
999 end
1000
1001 defp maybe_order(query, %{order: :asc}) do
1002 query
1003 |> order_by(asc: :id)
1004 end
1005
1006 defp maybe_order(query, _), do: query
1007
1008 defp fetch_activities_query_ap_ids_ops(opts) do
1009 source_user = opts[:muting_user]
1010 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1011
1012 ap_id_relationships =
1013 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1014 [:block | ap_id_relationships]
1015 else
1016 ap_id_relationships
1017 end
1018
1019 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1020
1021 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1022 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1023
1024 restrict_muted_reblogs_opts =
1025 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1026
1027 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1028 end
1029
1030 def fetch_activities_query(recipients, opts \\ %{}) do
1031 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1032 fetch_activities_query_ap_ids_ops(opts)
1033
1034 config = %{
1035 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1036 }
1037
1038 Activity
1039 |> maybe_preload_objects(opts)
1040 |> maybe_preload_bookmarks(opts)
1041 |> maybe_preload_report_notes(opts)
1042 |> maybe_set_thread_muted_field(opts)
1043 |> maybe_order(opts)
1044 |> restrict_recipients(recipients, opts[:user])
1045 |> restrict_replies(opts)
1046 |> restrict_tag(opts)
1047 |> restrict_tag_reject(opts)
1048 |> restrict_tag_all(opts)
1049 |> restrict_since(opts)
1050 |> restrict_local(opts)
1051 |> restrict_actor(opts)
1052 |> restrict_type(opts)
1053 |> restrict_state(opts)
1054 |> restrict_favorited_by(opts)
1055 |> restrict_blocked(restrict_blocked_opts)
1056 |> restrict_muted(restrict_muted_opts)
1057 |> restrict_filtered(opts)
1058 |> restrict_media(opts)
1059 |> restrict_visibility(opts)
1060 |> restrict_thread_visibility(opts, config)
1061 |> restrict_reblogs(opts)
1062 |> restrict_pinned(opts)
1063 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1064 |> restrict_instance(opts)
1065 |> restrict_announce_object_actor(opts)
1066 |> restrict_filtered(opts)
1067 |> Activity.restrict_deactivated_users()
1068 |> exclude_poll_votes(opts)
1069 |> exclude_chat_messages(opts)
1070 |> exclude_invisible_actors(opts)
1071 |> exclude_visibility(opts)
1072 end
1073
1074 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1075 list_memberships = Pleroma.List.memberships(opts[:user])
1076
1077 fetch_activities_query(recipients ++ list_memberships, opts)
1078 |> Pagination.fetch_paginated(opts, pagination)
1079 |> Enum.reverse()
1080 |> maybe_update_cc(list_memberships, opts[:user])
1081 end
1082
1083 @doc """
1084 Fetch favorites activities of user with order by sort adds to favorites
1085 """
1086 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1087 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1088 user.ap_id
1089 |> Activity.Queries.by_actor()
1090 |> Activity.Queries.by_type("Like")
1091 |> Activity.with_joined_object()
1092 |> Object.with_joined_activity()
1093 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1094 |> order_by([like, _, _], desc_nulls_last: like.id)
1095 |> Pagination.fetch_paginated(
1096 Map.merge(params, %{skip_order: true}),
1097 pagination
1098 )
1099 end
1100
1101 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1102 Enum.map(activities, fn
1103 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1104 if Enum.any?(bcc, &(&1 in list_memberships)) do
1105 update_in(activity.data["cc"], &[user_ap_id | &1])
1106 else
1107 activity
1108 end
1109
1110 activity ->
1111 activity
1112 end)
1113 end
1114
1115 defp maybe_update_cc(activities, _, _), do: activities
1116
1117 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1118 from(activity in query,
1119 where:
1120 fragment("? && ?", activity.recipients, ^recipients) or
1121 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1122 ^Constants.as_public() in activity.recipients)
1123 )
1124 end
1125
1126 def fetch_activities_bounded(
1127 recipients,
1128 recipients_with_public,
1129 opts \\ %{},
1130 pagination \\ :keyset
1131 ) do
1132 fetch_activities_query([], opts)
1133 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1134 |> Pagination.fetch_paginated(opts, pagination)
1135 |> Enum.reverse()
1136 end
1137
1138 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1139 def upload(file, opts \\ []) do
1140 with {:ok, data} <- Upload.store(file, opts) do
1141 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1142
1143 Repo.insert(%Object{data: obj_data})
1144 end
1145 end
1146
1147 @spec get_actor_url(any()) :: binary() | nil
1148 defp get_actor_url(url) when is_binary(url), do: url
1149 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1150
1151 defp get_actor_url(url) when is_list(url) do
1152 url
1153 |> List.first()
1154 |> get_actor_url()
1155 end
1156
1157 defp get_actor_url(_url), do: nil
1158
1159 defp object_to_user_data(data) do
1160 avatar =
1161 data["icon"]["url"] &&
1162 %{
1163 "type" => "Image",
1164 "url" => [%{"href" => data["icon"]["url"]}]
1165 }
1166
1167 banner =
1168 data["image"]["url"] &&
1169 %{
1170 "type" => "Image",
1171 "url" => [%{"href" => data["image"]["url"]}]
1172 }
1173
1174 fields =
1175 data
1176 |> Map.get("attachment", [])
1177 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1178 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1179
1180 emojis =
1181 data
1182 |> Map.get("tag", [])
1183 |> Enum.filter(fn
1184 %{"type" => "Emoji"} -> true
1185 _ -> false
1186 end)
1187 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1188 {String.trim(name, ":"), url}
1189 end)
1190
1191 locked = data["manuallyApprovesFollowers"] || false
1192 capabilities = data["capabilities"] || %{}
1193 accepts_chat_messages = capabilities["acceptsChatMessages"]
1194 data = Transmogrifier.maybe_fix_user_object(data)
1195 discoverable = data["discoverable"] || false
1196 invisible = data["invisible"] || false
1197 actor_type = data["type"] || "Person"
1198
1199 public_key =
1200 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1201 data["publicKey"]["publicKeyPem"]
1202 else
1203 nil
1204 end
1205
1206 shared_inbox =
1207 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1208 data["endpoints"]["sharedInbox"]
1209 else
1210 nil
1211 end
1212
1213 user_data = %{
1214 ap_id: data["id"],
1215 uri: get_actor_url(data["url"]),
1216 ap_enabled: true,
1217 banner: banner,
1218 fields: fields,
1219 emoji: emojis,
1220 locked: locked,
1221 discoverable: discoverable,
1222 invisible: invisible,
1223 avatar: avatar,
1224 name: data["name"],
1225 follower_address: data["followers"],
1226 following_address: data["following"],
1227 bio: data["summary"],
1228 actor_type: actor_type,
1229 also_known_as: Map.get(data, "alsoKnownAs", []),
1230 public_key: public_key,
1231 inbox: data["inbox"],
1232 shared_inbox: shared_inbox,
1233 accepts_chat_messages: accepts_chat_messages
1234 }
1235
1236 # nickname can be nil because of virtual actors
1237 if data["preferredUsername"] do
1238 Map.put(
1239 user_data,
1240 :nickname,
1241 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1242 )
1243 else
1244 Map.put(user_data, :nickname, nil)
1245 end
1246 end
1247
1248 def fetch_follow_information_for_user(user) do
1249 with {:ok, following_data} <-
1250 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1251 {:ok, hide_follows} <- collection_private(following_data),
1252 {:ok, followers_data} <-
1253 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1254 {:ok, hide_followers} <- collection_private(followers_data) do
1255 {:ok,
1256 %{
1257 hide_follows: hide_follows,
1258 follower_count: normalize_counter(followers_data["totalItems"]),
1259 following_count: normalize_counter(following_data["totalItems"]),
1260 hide_followers: hide_followers
1261 }}
1262 else
1263 {:error, _} = e -> e
1264 e -> {:error, e}
1265 end
1266 end
1267
1268 defp normalize_counter(counter) when is_integer(counter), do: counter
1269 defp normalize_counter(_), do: 0
1270
1271 def maybe_update_follow_information(user_data) do
1272 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1273 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1274 {_, true} <-
1275 {:collections_available,
1276 !!(user_data[:following_address] && user_data[:follower_address])},
1277 {:ok, info} <-
1278 fetch_follow_information_for_user(user_data) do
1279 info = Map.merge(user_data[:info] || %{}, info)
1280
1281 user_data
1282 |> Map.put(:info, info)
1283 else
1284 {:user_type_check, false} ->
1285 user_data
1286
1287 {:collections_available, false} ->
1288 user_data
1289
1290 {:enabled, false} ->
1291 user_data
1292
1293 e ->
1294 Logger.error(
1295 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1296 )
1297
1298 user_data
1299 end
1300 end
1301
1302 defp collection_private(%{"first" => %{"type" => type}})
1303 when type in ["CollectionPage", "OrderedCollectionPage"],
1304 do: {:ok, false}
1305
1306 defp collection_private(%{"first" => first}) do
1307 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1309 {:ok, false}
1310 else
1311 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1312 {:error, _} = e -> e
1313 e -> {:error, e}
1314 end
1315 end
1316
1317 defp collection_private(_data), do: {:ok, true}
1318
1319 def user_data_from_user_object(data) do
1320 with {:ok, data} <- MRF.filter(data) do
1321 {:ok, object_to_user_data(data)}
1322 else
1323 e -> {:error, e}
1324 end
1325 end
1326
1327 def fetch_and_prepare_user_from_ap_id(ap_id) do
1328 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1329 {:ok, data} <- user_data_from_user_object(data) do
1330 {:ok, maybe_update_follow_information(data)}
1331 else
1332 {:error, "Object has been deleted" = e} ->
1333 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1334 {:error, e}
1335
1336 {:error, {:reject, reason} = e} ->
1337 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1338 {:error, e}
1339
1340 {:error, e} ->
1341 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1342 {:error, e}
1343 end
1344 end
1345
1346 def maybe_handle_clashing_nickname(data) do
1347 nickname = data[:nickname]
1348
1349 with %User{} = old_user <- User.get_by_nickname(nickname),
1350 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1351 Logger.info(
1352 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1353 data[:ap_id]
1354 }, renaming."
1355 )
1356
1357 old_user
1358 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1359 |> User.update_and_set_cache()
1360 else
1361 {:ap_id_comparison, true} ->
1362 Logger.info(
1363 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1364 )
1365
1366 _ ->
1367 nil
1368 end
1369 end
1370
1371 def make_user_from_ap_id(ap_id) do
1372 user = User.get_cached_by_ap_id(ap_id)
1373
1374 if user && !User.ap_enabled?(user) do
1375 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1376 else
1377 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1378 if user do
1379 user
1380 |> User.remote_user_changeset(data)
1381 |> User.update_and_set_cache()
1382 else
1383 maybe_handle_clashing_nickname(data)
1384
1385 data
1386 |> User.remote_user_changeset()
1387 |> Repo.insert()
1388 |> User.set_cache()
1389 end
1390 end
1391 end
1392 end
1393
1394 def make_user_from_nickname(nickname) do
1395 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1396 make_user_from_ap_id(ap_id)
1397 else
1398 _e -> {:error, "No AP id in WebFinger"}
1399 end
1400 end
1401
1402 # filter out broken threads
1403 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1404 entire_thread_visible_for_user?(activity, user)
1405 end
1406
1407 # do post-processing on a specific activity
1408 def contain_activity(%Activity{} = activity, %User{} = user) do
1409 contain_broken_threads(activity, user)
1410 end
1411
1412 def fetch_direct_messages_query do
1413 Activity
1414 |> restrict_type(%{type: "Create"})
1415 |> restrict_visibility(%{visibility: "direct"})
1416 |> order_by([activity], asc: activity.id)
1417 end
1418 end