ActivityPub: Remove ActivityPub.accept
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 @object_types ["ChatMessage", "Question", "Answer"]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
92 {:ok, object, meta}
93 end
94 end
95
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
99 {:ok, activity} <-
100 Repo.insert(%Activity{
101 data: object,
102 local: local,
103 recipients: recipients,
104 actor: object["actor"]
105 }) do
106 {:ok, activity, meta}
107 end
108 end
109
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
121 {:ok, activity} =
122 %Activity{
123 data: map,
124 local: local,
125 actor: map["actor"],
126 recipients: recipients
127 }
128 |> Repo.insert()
129 |> maybe_create_activity_expiration()
130
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
133
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
135
136 {:ok, activity}
137 else
138 %Activity{} = activity ->
139 {:ok, activity}
140
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
143 data: map,
144 local: local,
145 actor: map["actor"],
146 recipients: recipients,
147 id: "pleroma:fakeid"
148 }
149
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
151 {:ok, activity}
152
153 error ->
154 {:error, error}
155 end
156 end
157
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
160
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
163 stream_out(activity)
164 stream_out_participations(participations)
165 end
166
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
169 {:ok, activity}
170 end
171 end
172
173 defp maybe_create_activity_expiration(result), do: result
174
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
179 {:ok, conversation}
180 end
181 end
182
183 defp get_participations({:ok, conversation}) do
184 conversation
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
187 end
188
189 defp get_participations(_), do: []
190
191 def stream_out_participations(participations) do
192 participations =
193 participations
194 |> Repo.preload(:user)
195
196 Streamer.stream("participation", participations)
197 end
198
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
202
203 last_activity_id =
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
205 user: user,
206 blocking_user: user
207 })
208
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
211 end
212 end
213 end
214
215 def stream_out_participations(_, _), do: :noop
216
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
219 activity
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
222 end
223
224 def stream_out(_activity) do
225 :noop
226 end
227
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
231 result
232 end
233 end
234
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
241
242 create_data =
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 )
247
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:quick_insert, true, activity} ->
258 {:ok, activity}
259
260 {:fake, true, activity} ->
261 {:ok, activity}
262
263 {:error, message} ->
264 Repo.rollback(message)
265 end
266 end
267
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274
275 listen_data =
276 make_listen_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 end
286 end
287
288 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
289 def reject(params) do
290 accept_or_reject("Reject", params)
291 end
292
293 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
294 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
297
298 data =
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Maps.put_if_present("id", activity_id)
301
302 with {:ok, activity} <- insert(data, local),
303 _ <- notify_and_stream(activity),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 end
307 end
308
309 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
310 {:ok, Activity.t()} | nil | {:error, any()}
311 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
312 with {:ok, result} <-
313 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
314 result
315 end
316 end
317
318 defp do_unfollow(follower, followed, activity_id, local) do
319 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
320 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
321 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
322 {:ok, activity} <- insert(unfollow_data, local),
323 _ <- notify_and_stream(activity),
324 :ok <- maybe_federate(activity) do
325 {:ok, activity}
326 else
327 nil -> nil
328 {:error, error} -> Repo.rollback(error)
329 end
330 end
331
332 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
333 def flag(
334 %{
335 actor: actor,
336 context: _context,
337 account: account,
338 statuses: statuses,
339 content: content
340 } = params
341 ) do
342 # only accept false as false value
343 local = !(params[:local] == false)
344 forward = !(params[:forward] == false)
345
346 additional = params[:additional] || %{}
347
348 additional =
349 if forward do
350 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
351 else
352 Map.merge(additional, %{"to" => [], "cc" => []})
353 end
354
355 with flag_data <- make_flag_data(params, additional),
356 {:ok, activity} <- insert(flag_data, local),
357 {:ok, stripped_activity} <- strip_report_status_data(activity),
358 _ <- notify_and_stream(activity),
359 :ok <- maybe_federate(stripped_activity) do
360 User.all_superusers()
361 |> Enum.filter(fn user -> not is_nil(user.email) end)
362 |> Enum.each(fn superuser ->
363 superuser
364 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
365 |> Pleroma.Emails.Mailer.deliver_async()
366 end)
367
368 {:ok, activity}
369 end
370 end
371
372 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
373 def move(%User{} = origin, %User{} = target, local \\ true) do
374 params = %{
375 "type" => "Move",
376 "actor" => origin.ap_id,
377 "object" => origin.ap_id,
378 "target" => target.ap_id
379 }
380
381 with true <- origin.ap_id in target.also_known_as,
382 {:ok, activity} <- insert(params, local),
383 _ <- notify_and_stream(activity) do
384 maybe_federate(activity)
385
386 BackgroundWorker.enqueue("move_following", %{
387 "origin_id" => origin.id,
388 "target_id" => target.id
389 })
390
391 {:ok, activity}
392 else
393 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
394 err -> err
395 end
396 end
397
398 def fetch_activities_for_context_query(context, opts) do
399 public = [Constants.as_public()]
400
401 recipients =
402 if opts[:user],
403 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
404 else: public
405
406 from(activity in Activity)
407 |> maybe_preload_objects(opts)
408 |> maybe_preload_bookmarks(opts)
409 |> maybe_set_thread_muted_field(opts)
410 |> restrict_blocked(opts)
411 |> restrict_recipients(recipients, opts[:user])
412 |> restrict_filtered(opts)
413 |> where(
414 [activity],
415 fragment(
416 "?->>'type' = ? and ?->>'context' = ?",
417 activity.data,
418 "Create",
419 activity.data,
420 ^context
421 )
422 )
423 |> exclude_poll_votes(opts)
424 |> exclude_id(opts)
425 |> order_by([activity], desc: activity.id)
426 end
427
428 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
429 def fetch_activities_for_context(context, opts \\ %{}) do
430 context
431 |> fetch_activities_for_context_query(opts)
432 |> Repo.all()
433 end
434
435 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
436 FlakeId.Ecto.CompatType.t() | nil
437 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
438 context
439 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
440 |> restrict_visibility(%{visibility: "direct"})
441 |> limit(1)
442 |> select([a], a.id)
443 |> Repo.one()
444 end
445
446 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
447 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
448 opts = Map.delete(opts, :user)
449
450 [Constants.as_public()]
451 |> fetch_activities_query(opts)
452 |> restrict_unlisted(opts)
453 |> Pagination.fetch_paginated(opts, pagination)
454 end
455
456 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
457 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
458 opts
459 |> Map.put(:restrict_unlisted, true)
460 |> fetch_public_or_unlisted_activities(pagination)
461 end
462
463 @valid_visibilities ~w[direct unlisted public private]
464
465 defp restrict_visibility(query, %{visibility: visibility})
466 when is_list(visibility) do
467 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
468 from(
469 a in query,
470 where:
471 fragment(
472 "activity_visibility(?, ?, ?) = ANY (?)",
473 a.actor,
474 a.recipients,
475 a.data,
476 ^visibility
477 )
478 )
479 else
480 Logger.error("Could not restrict visibility to #{visibility}")
481 end
482 end
483
484 defp restrict_visibility(query, %{visibility: visibility})
485 when visibility in @valid_visibilities do
486 from(
487 a in query,
488 where:
489 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
490 )
491 end
492
493 defp restrict_visibility(_query, %{visibility: visibility})
494 when visibility not in @valid_visibilities do
495 Logger.error("Could not restrict visibility to #{visibility}")
496 end
497
498 defp restrict_visibility(query, _visibility), do: query
499
500 defp exclude_visibility(query, %{exclude_visibilities: visibility})
501 when is_list(visibility) do
502 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
503 from(
504 a in query,
505 where:
506 not fragment(
507 "activity_visibility(?, ?, ?) = ANY (?)",
508 a.actor,
509 a.recipients,
510 a.data,
511 ^visibility
512 )
513 )
514 else
515 Logger.error("Could not exclude visibility to #{visibility}")
516 query
517 end
518 end
519
520 defp exclude_visibility(query, %{exclude_visibilities: visibility})
521 when visibility in @valid_visibilities do
522 from(
523 a in query,
524 where:
525 not fragment(
526 "activity_visibility(?, ?, ?) = ?",
527 a.actor,
528 a.recipients,
529 a.data,
530 ^visibility
531 )
532 )
533 end
534
535 defp exclude_visibility(query, %{exclude_visibilities: visibility})
536 when visibility not in [nil | @valid_visibilities] do
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540
541 defp exclude_visibility(query, _visibility), do: query
542
543 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
544 do: query
545
546 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
547 do: query
548
549 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
550 from(
551 a in query,
552 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
553 )
554 end
555
556 defp restrict_thread_visibility(query, _, _), do: query
557
558 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
559 params =
560 params
561 |> Map.put(:user, reading_user)
562 |> Map.put(:actor_id, user.ap_id)
563
564 %{
565 godmode: params[:godmode],
566 reading_user: reading_user
567 }
568 |> user_activities_recipients()
569 |> fetch_activities(params)
570 |> Enum.reverse()
571 end
572
573 def fetch_user_activities(user, reading_user, params \\ %{}) do
574 params =
575 params
576 |> Map.put(:type, ["Create", "Announce"])
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579 |> Map.put(:pinned_activity_ids, user.pinned_activities)
580
581 params =
582 if User.blocks?(reading_user, user) do
583 params
584 else
585 params
586 |> Map.put(:blocking_user, reading_user)
587 |> Map.put(:muting_user, reading_user)
588 end
589
590 %{
591 godmode: params[:godmode],
592 reading_user: reading_user
593 }
594 |> user_activities_recipients()
595 |> fetch_activities(params)
596 |> Enum.reverse()
597 end
598
599 def fetch_statuses(reading_user, params) do
600 params = Map.put(params, :type, ["Create", "Announce"])
601
602 %{
603 godmode: params[:godmode],
604 reading_user: reading_user
605 }
606 |> user_activities_recipients()
607 |> fetch_activities(params, :offset)
608 |> Enum.reverse()
609 end
610
611 defp user_activities_recipients(%{godmode: true}), do: []
612
613 defp user_activities_recipients(%{reading_user: reading_user}) do
614 if reading_user do
615 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
616 else
617 [Constants.as_public()]
618 end
619 end
620
621 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
622 raise "Can't use the child object without preloading!"
623 end
624
625 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
626 from(
627 [activity, object] in query,
628 where:
629 fragment(
630 "?->>'type' != ? or ?->>'actor' != ?",
631 activity.data,
632 "Announce",
633 object.data,
634 ^actor
635 )
636 )
637 end
638
639 defp restrict_announce_object_actor(query, _), do: query
640
641 defp restrict_since(query, %{since_id: ""}), do: query
642
643 defp restrict_since(query, %{since_id: since_id}) do
644 from(activity in query, where: activity.id > ^since_id)
645 end
646
647 defp restrict_since(query, _), do: query
648
649 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
650 raise "Can't use the child object without preloading!"
651 end
652
653 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
654 from(
655 [_activity, object] in query,
656 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
657 )
658 end
659
660 defp restrict_tag_reject(query, _), do: query
661
662 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
663 raise "Can't use the child object without preloading!"
664 end
665
666 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
667 from(
668 [_activity, object] in query,
669 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
670 )
671 end
672
673 defp restrict_tag_all(query, _), do: query
674
675 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
676 raise "Can't use the child object without preloading!"
677 end
678
679 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
680 from(
681 [_activity, object] in query,
682 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
683 )
684 end
685
686 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
687 from(
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
690 )
691 end
692
693 defp restrict_tag(query, _), do: query
694
695 defp restrict_recipients(query, [], _user), do: query
696
697 defp restrict_recipients(query, recipients, nil) do
698 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
699 end
700
701 defp restrict_recipients(query, recipients, user) do
702 from(
703 activity in query,
704 where: fragment("? && ?", ^recipients, activity.recipients),
705 or_where: activity.actor == ^user.ap_id
706 )
707 end
708
709 defp restrict_local(query, %{local_only: true}) do
710 from(activity in query, where: activity.local == true)
711 end
712
713 defp restrict_local(query, _), do: query
714
715 defp restrict_actor(query, %{actor_id: actor_id}) do
716 from(activity in query, where: activity.actor == ^actor_id)
717 end
718
719 defp restrict_actor(query, _), do: query
720
721 defp restrict_type(query, %{type: type}) when is_binary(type) do
722 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
723 end
724
725 defp restrict_type(query, %{type: type}) do
726 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
727 end
728
729 defp restrict_type(query, _), do: query
730
731 defp restrict_state(query, %{state: state}) do
732 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
733 end
734
735 defp restrict_state(query, _), do: query
736
737 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
738 from(
739 [_activity, object] in query,
740 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
741 )
742 end
743
744 defp restrict_favorited_by(query, _), do: query
745
746 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
747 raise "Can't use the child object without preloading!"
748 end
749
750 defp restrict_media(query, %{only_media: true}) do
751 from(
752 [activity, object] in query,
753 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
754 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
755 )
756 end
757
758 defp restrict_media(query, _), do: query
759
760 defp restrict_replies(query, %{exclude_replies: true}) do
761 from(
762 [_activity, object] in query,
763 where: fragment("?->>'inReplyTo' is null", object.data)
764 )
765 end
766
767 defp restrict_replies(query, %{
768 reply_filtering_user: user,
769 reply_visibility: "self"
770 }) do
771 from(
772 [activity, object] in query,
773 where:
774 fragment(
775 "?->>'inReplyTo' is null OR ? = ANY(?)",
776 object.data,
777 ^user.ap_id,
778 activity.recipients
779 )
780 )
781 end
782
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "following"
786 }) do
787 from(
788 [activity, object] in query,
789 where:
790 fragment(
791 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
792 object.data,
793 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
794 activity.recipients,
795 activity.actor,
796 activity.actor,
797 ^user.ap_id
798 )
799 )
800 end
801
802 defp restrict_replies(query, _), do: query
803
804 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
805 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
806 end
807
808 defp restrict_reblogs(query, _), do: query
809
810 defp restrict_muted(query, %{with_muted: true}), do: query
811
812 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
813 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
814
815 query =
816 from([activity] in query,
817 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
818 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
819 )
820
821 unless opts[:skip_preload] do
822 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
823 else
824 query
825 end
826 end
827
828 defp restrict_muted(query, _), do: query
829
830 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
831 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
832 domain_blocks = user.domain_blocks || []
833
834 following_ap_ids = User.get_friends_ap_ids(user)
835
836 query =
837 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
838
839 from(
840 [activity, object: o] in query,
841 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
842 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
843 where:
844 fragment(
845 "recipients_contain_blocked_domains(?, ?) = false",
846 activity.recipients,
847 ^domain_blocks
848 ),
849 where:
850 fragment(
851 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
852 activity.data,
853 activity.data,
854 ^blocked_ap_ids
855 ),
856 where:
857 fragment(
858 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
859 activity.actor,
860 ^domain_blocks,
861 activity.actor,
862 ^following_ap_ids
863 ),
864 where:
865 fragment(
866 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
867 o.data,
868 ^domain_blocks,
869 o.data,
870 ^following_ap_ids
871 )
872 )
873 end
874
875 defp restrict_blocked(query, _), do: query
876
877 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
878 from(
879 activity in query,
880 where:
881 fragment(
882 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
883 activity.data,
884 ^[Constants.as_public()]
885 )
886 )
887 end
888
889 defp restrict_unlisted(query, _), do: query
890
891 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
892 from(activity in query, where: activity.id in ^ids)
893 end
894
895 defp restrict_pinned(query, _), do: query
896
897 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
898 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
899
900 from(
901 activity in query,
902 where:
903 fragment(
904 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
905 activity.data,
906 activity.actor,
907 ^muted_reblogs
908 )
909 )
910 end
911
912 defp restrict_muted_reblogs(query, _), do: query
913
914 defp restrict_instance(query, %{instance: instance}) do
915 users =
916 from(
917 u in User,
918 select: u.ap_id,
919 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
920 )
921 |> Repo.all()
922
923 from(activity in query, where: activity.actor in ^users)
924 end
925
926 defp restrict_instance(query, _), do: query
927
928 defp restrict_filtered(query, %{user: %User{} = user}) do
929 case Filter.compose_regex(user) do
930 nil ->
931 query
932
933 regex ->
934 from([activity, object] in query,
935 where:
936 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
937 activity.actor == ^user.ap_id
938 )
939 end
940 end
941
942 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
943 restrict_filtered(query, %{user: user})
944 end
945
946 defp restrict_filtered(query, _), do: query
947
948 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
949
950 defp exclude_poll_votes(query, _) do
951 if has_named_binding?(query, :object) do
952 from([activity, object: o] in query,
953 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
954 )
955 else
956 query
957 end
958 end
959
960 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
961
962 defp exclude_chat_messages(query, _) do
963 if has_named_binding?(query, :object) do
964 from([activity, object: o] in query,
965 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
966 )
967 else
968 query
969 end
970 end
971
972 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
973
974 defp exclude_invisible_actors(query, _opts) do
975 invisible_ap_ids =
976 User.Query.build(%{invisible: true, select: [:ap_id]})
977 |> Repo.all()
978 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
979
980 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
981 end
982
983 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
984 from(activity in query, where: activity.id != ^id)
985 end
986
987 defp exclude_id(query, _), do: query
988
989 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
990
991 defp maybe_preload_objects(query, _) do
992 query
993 |> Activity.with_preloaded_object()
994 end
995
996 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
997
998 defp maybe_preload_bookmarks(query, opts) do
999 query
1000 |> Activity.with_preloaded_bookmark(opts[:user])
1001 end
1002
1003 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1004 query
1005 |> Activity.with_preloaded_report_notes()
1006 end
1007
1008 defp maybe_preload_report_notes(query, _), do: query
1009
1010 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1011
1012 defp maybe_set_thread_muted_field(query, opts) do
1013 query
1014 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1015 end
1016
1017 defp maybe_order(query, %{order: :desc}) do
1018 query
1019 |> order_by(desc: :id)
1020 end
1021
1022 defp maybe_order(query, %{order: :asc}) do
1023 query
1024 |> order_by(asc: :id)
1025 end
1026
1027 defp maybe_order(query, _), do: query
1028
1029 defp fetch_activities_query_ap_ids_ops(opts) do
1030 source_user = opts[:muting_user]
1031 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1032
1033 ap_id_relationships =
1034 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1035 [:block | ap_id_relationships]
1036 else
1037 ap_id_relationships
1038 end
1039
1040 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1041
1042 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1043 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1044
1045 restrict_muted_reblogs_opts =
1046 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1047
1048 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1049 end
1050
1051 def fetch_activities_query(recipients, opts \\ %{}) do
1052 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1053 fetch_activities_query_ap_ids_ops(opts)
1054
1055 config = %{
1056 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1057 }
1058
1059 Activity
1060 |> maybe_preload_objects(opts)
1061 |> maybe_preload_bookmarks(opts)
1062 |> maybe_preload_report_notes(opts)
1063 |> maybe_set_thread_muted_field(opts)
1064 |> maybe_order(opts)
1065 |> restrict_recipients(recipients, opts[:user])
1066 |> restrict_replies(opts)
1067 |> restrict_tag(opts)
1068 |> restrict_tag_reject(opts)
1069 |> restrict_tag_all(opts)
1070 |> restrict_since(opts)
1071 |> restrict_local(opts)
1072 |> restrict_actor(opts)
1073 |> restrict_type(opts)
1074 |> restrict_state(opts)
1075 |> restrict_favorited_by(opts)
1076 |> restrict_blocked(restrict_blocked_opts)
1077 |> restrict_muted(restrict_muted_opts)
1078 |> restrict_filtered(opts)
1079 |> restrict_media(opts)
1080 |> restrict_visibility(opts)
1081 |> restrict_thread_visibility(opts, config)
1082 |> restrict_reblogs(opts)
1083 |> restrict_pinned(opts)
1084 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1085 |> restrict_instance(opts)
1086 |> restrict_announce_object_actor(opts)
1087 |> restrict_filtered(opts)
1088 |> Activity.restrict_deactivated_users()
1089 |> exclude_poll_votes(opts)
1090 |> exclude_chat_messages(opts)
1091 |> exclude_invisible_actors(opts)
1092 |> exclude_visibility(opts)
1093 end
1094
1095 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1096 list_memberships = Pleroma.List.memberships(opts[:user])
1097
1098 fetch_activities_query(recipients ++ list_memberships, opts)
1099 |> Pagination.fetch_paginated(opts, pagination)
1100 |> Enum.reverse()
1101 |> maybe_update_cc(list_memberships, opts[:user])
1102 end
1103
1104 @doc """
1105 Fetch favorites activities of user with order by sort adds to favorites
1106 """
1107 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1108 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1109 user.ap_id
1110 |> Activity.Queries.by_actor()
1111 |> Activity.Queries.by_type("Like")
1112 |> Activity.with_joined_object()
1113 |> Object.with_joined_activity()
1114 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1115 |> order_by([like, _, _], desc_nulls_last: like.id)
1116 |> Pagination.fetch_paginated(
1117 Map.merge(params, %{skip_order: true}),
1118 pagination
1119 )
1120 end
1121
1122 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1123 Enum.map(activities, fn
1124 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1125 if Enum.any?(bcc, &(&1 in list_memberships)) do
1126 update_in(activity.data["cc"], &[user_ap_id | &1])
1127 else
1128 activity
1129 end
1130
1131 activity ->
1132 activity
1133 end)
1134 end
1135
1136 defp maybe_update_cc(activities, _, _), do: activities
1137
1138 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1139 from(activity in query,
1140 where:
1141 fragment("? && ?", activity.recipients, ^recipients) or
1142 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1143 ^Constants.as_public() in activity.recipients)
1144 )
1145 end
1146
1147 def fetch_activities_bounded(
1148 recipients,
1149 recipients_with_public,
1150 opts \\ %{},
1151 pagination \\ :keyset
1152 ) do
1153 fetch_activities_query([], opts)
1154 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1155 |> Pagination.fetch_paginated(opts, pagination)
1156 |> Enum.reverse()
1157 end
1158
1159 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1160 def upload(file, opts \\ []) do
1161 with {:ok, data} <- Upload.store(file, opts) do
1162 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1163
1164 Repo.insert(%Object{data: obj_data})
1165 end
1166 end
1167
1168 @spec get_actor_url(any()) :: binary() | nil
1169 defp get_actor_url(url) when is_binary(url), do: url
1170 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1171
1172 defp get_actor_url(url) when is_list(url) do
1173 url
1174 |> List.first()
1175 |> get_actor_url()
1176 end
1177
1178 defp get_actor_url(_url), do: nil
1179
1180 defp object_to_user_data(data) do
1181 avatar =
1182 data["icon"]["url"] &&
1183 %{
1184 "type" => "Image",
1185 "url" => [%{"href" => data["icon"]["url"]}]
1186 }
1187
1188 banner =
1189 data["image"]["url"] &&
1190 %{
1191 "type" => "Image",
1192 "url" => [%{"href" => data["image"]["url"]}]
1193 }
1194
1195 fields =
1196 data
1197 |> Map.get("attachment", [])
1198 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1199 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1200
1201 emojis =
1202 data
1203 |> Map.get("tag", [])
1204 |> Enum.filter(fn
1205 %{"type" => "Emoji"} -> true
1206 _ -> false
1207 end)
1208 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1209 {String.trim(name, ":"), url}
1210 end)
1211
1212 locked = data["manuallyApprovesFollowers"] || false
1213 capabilities = data["capabilities"] || %{}
1214 accepts_chat_messages = capabilities["acceptsChatMessages"]
1215 data = Transmogrifier.maybe_fix_user_object(data)
1216 discoverable = data["discoverable"] || false
1217 invisible = data["invisible"] || false
1218 actor_type = data["type"] || "Person"
1219
1220 public_key =
1221 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1222 data["publicKey"]["publicKeyPem"]
1223 else
1224 nil
1225 end
1226
1227 shared_inbox =
1228 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1229 data["endpoints"]["sharedInbox"]
1230 else
1231 nil
1232 end
1233
1234 user_data = %{
1235 ap_id: data["id"],
1236 uri: get_actor_url(data["url"]),
1237 ap_enabled: true,
1238 banner: banner,
1239 fields: fields,
1240 emoji: emojis,
1241 locked: locked,
1242 discoverable: discoverable,
1243 invisible: invisible,
1244 avatar: avatar,
1245 name: data["name"],
1246 follower_address: data["followers"],
1247 following_address: data["following"],
1248 bio: data["summary"],
1249 actor_type: actor_type,
1250 also_known_as: Map.get(data, "alsoKnownAs", []),
1251 public_key: public_key,
1252 inbox: data["inbox"],
1253 shared_inbox: shared_inbox,
1254 accepts_chat_messages: accepts_chat_messages
1255 }
1256
1257 # nickname can be nil because of virtual actors
1258 if data["preferredUsername"] do
1259 Map.put(
1260 user_data,
1261 :nickname,
1262 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1263 )
1264 else
1265 Map.put(user_data, :nickname, nil)
1266 end
1267 end
1268
1269 def fetch_follow_information_for_user(user) do
1270 with {:ok, following_data} <-
1271 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1272 {:ok, hide_follows} <- collection_private(following_data),
1273 {:ok, followers_data} <-
1274 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1275 {:ok, hide_followers} <- collection_private(followers_data) do
1276 {:ok,
1277 %{
1278 hide_follows: hide_follows,
1279 follower_count: normalize_counter(followers_data["totalItems"]),
1280 following_count: normalize_counter(following_data["totalItems"]),
1281 hide_followers: hide_followers
1282 }}
1283 else
1284 {:error, _} = e -> e
1285 e -> {:error, e}
1286 end
1287 end
1288
1289 defp normalize_counter(counter) when is_integer(counter), do: counter
1290 defp normalize_counter(_), do: 0
1291
1292 def maybe_update_follow_information(user_data) do
1293 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1294 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1295 {_, true} <-
1296 {:collections_available,
1297 !!(user_data[:following_address] && user_data[:follower_address])},
1298 {:ok, info} <-
1299 fetch_follow_information_for_user(user_data) do
1300 info = Map.merge(user_data[:info] || %{}, info)
1301
1302 user_data
1303 |> Map.put(:info, info)
1304 else
1305 {:user_type_check, false} ->
1306 user_data
1307
1308 {:collections_available, false} ->
1309 user_data
1310
1311 {:enabled, false} ->
1312 user_data
1313
1314 e ->
1315 Logger.error(
1316 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1317 )
1318
1319 user_data
1320 end
1321 end
1322
1323 defp collection_private(%{"first" => %{"type" => type}})
1324 when type in ["CollectionPage", "OrderedCollectionPage"],
1325 do: {:ok, false}
1326
1327 defp collection_private(%{"first" => first}) do
1328 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1329 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1330 {:ok, false}
1331 else
1332 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1333 {:error, _} = e -> e
1334 e -> {:error, e}
1335 end
1336 end
1337
1338 defp collection_private(_data), do: {:ok, true}
1339
1340 def user_data_from_user_object(data) do
1341 with {:ok, data} <- MRF.filter(data) do
1342 {:ok, object_to_user_data(data)}
1343 else
1344 e -> {:error, e}
1345 end
1346 end
1347
1348 def fetch_and_prepare_user_from_ap_id(ap_id) do
1349 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1350 {:ok, data} <- user_data_from_user_object(data) do
1351 {:ok, maybe_update_follow_information(data)}
1352 else
1353 {:error, "Object has been deleted" = e} ->
1354 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1355 {:error, e}
1356
1357 {:error, {:reject, reason} = e} ->
1358 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1359 {:error, e}
1360
1361 {:error, e} ->
1362 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1363 {:error, e}
1364 end
1365 end
1366
1367 def maybe_handle_clashing_nickname(data) do
1368 nickname = data[:nickname]
1369
1370 with %User{} = old_user <- User.get_by_nickname(nickname),
1371 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1372 Logger.info(
1373 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1374 data[:ap_id]
1375 }, renaming."
1376 )
1377
1378 old_user
1379 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1380 |> User.update_and_set_cache()
1381 else
1382 {:ap_id_comparison, true} ->
1383 Logger.info(
1384 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1385 )
1386
1387 _ ->
1388 nil
1389 end
1390 end
1391
1392 def make_user_from_ap_id(ap_id) do
1393 user = User.get_cached_by_ap_id(ap_id)
1394
1395 if user && !User.ap_enabled?(user) do
1396 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1397 else
1398 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1399 if user do
1400 user
1401 |> User.remote_user_changeset(data)
1402 |> User.update_and_set_cache()
1403 else
1404 maybe_handle_clashing_nickname(data)
1405
1406 data
1407 |> User.remote_user_changeset()
1408 |> Repo.insert()
1409 |> User.set_cache()
1410 end
1411 end
1412 end
1413 end
1414
1415 def make_user_from_nickname(nickname) do
1416 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1417 make_user_from_ap_id(ap_id)
1418 else
1419 _e -> {:error, "No AP id in WebFinger"}
1420 end
1421 end
1422
1423 # filter out broken threads
1424 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1425 entire_thread_visible_for_user?(activity, user)
1426 end
1427
1428 # do post-processing on a specific activity
1429 def contain_activity(%Activity{} = activity, %User{} = user) do
1430 contain_broken_threads(activity, user)
1431 end
1432
1433 def fetch_direct_messages_query do
1434 Activity
1435 |> restrict_type(%{type: "Create"})
1436 |> restrict_visibility(%{visibility: "direct"})
1437 |> order_by([activity], asc: activity.id)
1438 end
1439 end