Apply suggestion to lib/pleroma/web/controller_helper.ex
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Maps
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
18 alias Pleroma.Repo
19 alias Pleroma.Upload
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(nil), do: true
71
72 defp check_actor_is_active(actor) when is_binary(actor) do
73 case User.get_cached_by_ap_id(actor) do
74 %User{deactivated: deactivated} -> not deactivated
75 _ -> false
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 defp increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 defp increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 defp increase_replies_count_if_reply(_create_data), do: :noop
104
105 defp increase_poll_votes_if_vote(%{
106 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
107 "type" => "Create",
108 "actor" => actor
109 }) do
110 Object.increase_vote_count(reply_ap_id, name, actor)
111 end
112
113 defp increase_poll_votes_if_vote(_create_data), do: :noop
114
115 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
116 def persist(object, meta) do
117 with local <- Keyword.fetch!(meta, :local),
118 {recipients, _, _} <- get_recipients(object),
119 {:ok, activity} <-
120 Repo.insert(%Activity{
121 data: object,
122 local: local,
123 recipients: recipients,
124 actor: object["actor"]
125 }) do
126 {:ok, activity, meta}
127 end
128 end
129
130 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
131 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
132 with nil <- Activity.normalize(map),
133 map <- lazy_put_activity_defaults(map, fake),
134 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
135 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
136 {:ok, map} <- MRF.filter(map),
137 {recipients, _, _} = get_recipients(map),
138 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
139 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
140 {:ok, map, object} <- insert_full_object(map) do
141 {:ok, activity} =
142 Repo.insert(%Activity{
143 data: map,
144 local: local,
145 actor: map["actor"],
146 recipients: recipients
147 })
148
149 # Splice in the child object if we have one.
150 activity = Maps.put_if_present(activity, :object, object)
151
152 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
153
154 {:ok, activity}
155 else
156 %Activity{} = activity ->
157 {:ok, activity}
158
159 {:fake, true, map, recipients} ->
160 activity = %Activity{
161 data: map,
162 local: local,
163 actor: map["actor"],
164 recipients: recipients,
165 id: "pleroma:fakeid"
166 }
167
168 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
169 {:ok, activity}
170
171 error ->
172 {:error, error}
173 end
174 end
175
176 def notify_and_stream(activity) do
177 Notification.create_notifications(activity)
178
179 conversation = create_or_bump_conversation(activity, activity.actor)
180 participations = get_participations(conversation)
181 stream_out(activity)
182 stream_out_participations(participations)
183 end
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
326 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
327 local = !(params[:local] == false)
328 activity_id = params[:activity_id]
329
330 data =
331 %{
332 "to" => to,
333 "cc" => cc,
334 "type" => "Update",
335 "actor" => actor,
336 "object" => object
337 }
338 |> Maps.put_if_present("id", activity_id)
339
340 with {:ok, activity} <- insert(data, local),
341 _ <- notify_and_stream(activity),
342 :ok <- maybe_federate(activity) do
343 {:ok, activity}
344 end
345 end
346
347 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | {:error, any()}
349 def follow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_follow(follower, followed, activity_id, local) do
357 data = make_follow_data(follower, followed, activity_id)
358
359 with {:ok, activity} <- insert(data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 {:error, error} -> Repo.rollback(error)
365 end
366 end
367
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
373 result
374 end
375 end
376
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
384 {:ok, activity}
385 else
386 nil -> nil
387 {:error, error} -> Repo.rollback(error)
388 end
389 end
390
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
396 result
397 end
398 end
399
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
402
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
405 end
406
407 block_data = make_block_data(blocker, blocked, activity_id)
408
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
412 {:ok, activity}
413 else
414 {:error, error} -> Repo.rollback(error)
415 end
416 end
417
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
419 def flag(
420 %{
421 actor: actor,
422 context: _context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 additional =
435 if forward do
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
437 else
438 Map.merge(additional, %{"to" => [], "cc" => []})
439 end
440
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
449 superuser
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
452 end)
453
454 {:ok, activity}
455 end
456 end
457
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
460 params = %{
461 "type" => "Move",
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
465 }
466
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
471
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
475 })
476
477 {:ok, activity}
478 else
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
480 err -> err
481 end
482 end
483
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
486
487 recipients =
488 if opts[:user],
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
490 else: public
491
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
498 |> where(
499 [activity],
500 fragment(
501 "?->>'type' = ? and ?->>'context' = ?",
502 activity.data,
503 "Create",
504 activity.data,
505 ^context
506 )
507 )
508 |> exclude_poll_votes(opts)
509 |> exclude_id(opts)
510 |> order_by([activity], desc: activity.id)
511 end
512
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
515 context
516 |> fetch_activities_for_context_query(opts)
517 |> Repo.all()
518 end
519
520 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
523 context
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
525 |> limit(1)
526 |> select([a], a.id)
527 |> Repo.one()
528 end
529
530 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
531 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
532 opts = Map.delete(opts, :user)
533
534 [Constants.as_public()]
535 |> fetch_activities_query(opts)
536 |> restrict_unlisted(opts)
537 |> Pagination.fetch_paginated(opts, pagination)
538 end
539
540 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
541 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
542 opts
543 |> Map.put(:restrict_unlisted, true)
544 |> fetch_public_or_unlisted_activities(pagination)
545 end
546
547 @valid_visibilities ~w[direct unlisted public private]
548
549 defp restrict_visibility(query, %{visibility: visibility})
550 when is_list(visibility) do
551 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
552 from(
553 a in query,
554 where:
555 fragment(
556 "activity_visibility(?, ?, ?) = ANY (?)",
557 a.actor,
558 a.recipients,
559 a.data,
560 ^visibility
561 )
562 )
563 else
564 Logger.error("Could not restrict visibility to #{visibility}")
565 end
566 end
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when visibility in @valid_visibilities do
570 from(
571 a in query,
572 where:
573 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
574 )
575 end
576
577 defp restrict_visibility(_query, %{visibility: visibility})
578 when visibility not in @valid_visibilities do
579 Logger.error("Could not restrict visibility to #{visibility}")
580 end
581
582 defp restrict_visibility(query, _visibility), do: query
583
584 defp exclude_visibility(query, %{exclude_visibilities: visibility})
585 when is_list(visibility) do
586 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
587 from(
588 a in query,
589 where:
590 not fragment(
591 "activity_visibility(?, ?, ?) = ANY (?)",
592 a.actor,
593 a.recipients,
594 a.data,
595 ^visibility
596 )
597 )
598 else
599 Logger.error("Could not exclude visibility to #{visibility}")
600 query
601 end
602 end
603
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when visibility in @valid_visibilities do
606 from(
607 a in query,
608 where:
609 not fragment(
610 "activity_visibility(?, ?, ?) = ?",
611 a.actor,
612 a.recipients,
613 a.data,
614 ^visibility
615 )
616 )
617 end
618
619 defp exclude_visibility(query, %{exclude_visibilities: visibility})
620 when visibility not in [nil | @valid_visibilities] do
621 Logger.error("Could not exclude visibility to #{visibility}")
622 query
623 end
624
625 defp exclude_visibility(query, _visibility), do: query
626
627 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
628 do: query
629
630 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
631 do: query
632
633 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
634 from(
635 a in query,
636 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
637 )
638 end
639
640 defp restrict_thread_visibility(query, _, _), do: query
641
642 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
643 params =
644 params
645 |> Map.put(:user, reading_user)
646 |> Map.put(:actor_id, user.ap_id)
647
648 %{
649 godmode: params[:godmode],
650 reading_user: reading_user
651 }
652 |> user_activities_recipients()
653 |> fetch_activities(params)
654 |> Enum.reverse()
655 end
656
657 def fetch_user_activities(user, reading_user, params \\ %{}) do
658 params =
659 params
660 |> Map.put(:type, ["Create", "Announce"])
661 |> Map.put(:user, reading_user)
662 |> Map.put(:actor_id, user.ap_id)
663 |> Map.put(:pinned_activity_ids, user.pinned_activities)
664
665 params =
666 if User.blocks?(reading_user, user) do
667 params
668 else
669 params
670 |> Map.put(:blocking_user, reading_user)
671 |> Map.put(:muting_user, reading_user)
672 end
673
674 %{
675 godmode: params[:godmode],
676 reading_user: reading_user
677 }
678 |> user_activities_recipients()
679 |> fetch_activities(params)
680 |> Enum.reverse()
681 end
682
683 def fetch_statuses(reading_user, params) do
684 params = Map.put(params, :type, ["Create", "Announce"])
685
686 %{
687 godmode: params[:godmode],
688 reading_user: reading_user
689 }
690 |> user_activities_recipients()
691 |> fetch_activities(params, :offset)
692 |> Enum.reverse()
693 end
694
695 defp user_activities_recipients(%{godmode: true}), do: []
696
697 defp user_activities_recipients(%{reading_user: reading_user}) do
698 if reading_user do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
700 else
701 [Constants.as_public()]
702 end
703 end
704
705 defp restrict_since(query, %{since_id: ""}), do: query
706
707 defp restrict_since(query, %{since_id: since_id}) do
708 from(activity in query, where: activity.id > ^since_id)
709 end
710
711 defp restrict_since(query, _), do: query
712
713 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
714 raise "Can't use the child object without preloading!"
715 end
716
717 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
718 from(
719 [_activity, object] in query,
720 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
721 )
722 end
723
724 defp restrict_tag_reject(query, _), do: query
725
726 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
727 raise "Can't use the child object without preloading!"
728 end
729
730 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
731 from(
732 [_activity, object] in query,
733 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
734 )
735 end
736
737 defp restrict_tag_all(query, _), do: query
738
739 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
741 end
742
743 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
744 from(
745 [_activity, object] in query,
746 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
747 )
748 end
749
750 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
751 from(
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
754 )
755 end
756
757 defp restrict_tag(query, _), do: query
758
759 defp restrict_recipients(query, [], _user), do: query
760
761 defp restrict_recipients(query, recipients, nil) do
762 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
763 end
764
765 defp restrict_recipients(query, recipients, user) do
766 from(
767 activity in query,
768 where: fragment("? && ?", ^recipients, activity.recipients),
769 or_where: activity.actor == ^user.ap_id
770 )
771 end
772
773 defp restrict_local(query, %{local_only: true}) do
774 from(activity in query, where: activity.local == true)
775 end
776
777 defp restrict_local(query, _), do: query
778
779 defp restrict_actor(query, %{actor_id: actor_id}) do
780 from(activity in query, where: activity.actor == ^actor_id)
781 end
782
783 defp restrict_actor(query, _), do: query
784
785 defp restrict_type(query, %{type: type}) when is_binary(type) do
786 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
787 end
788
789 defp restrict_type(query, %{type: type}) do
790 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
791 end
792
793 defp restrict_type(query, _), do: query
794
795 defp restrict_state(query, %{state: state}) do
796 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
797 end
798
799 defp restrict_state(query, _), do: query
800
801 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
802 from(
803 [_activity, object] in query,
804 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
805 )
806 end
807
808 defp restrict_favorited_by(query, _), do: query
809
810 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
811 raise "Can't use the child object without preloading!"
812 end
813
814 defp restrict_media(query, %{only_media: true}) do
815 from(
816 [_activity, object] in query,
817 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
818 )
819 end
820
821 defp restrict_media(query, _), do: query
822
823 defp restrict_replies(query, %{exclude_replies: true}) do
824 from(
825 [_activity, object] in query,
826 where: fragment("?->>'inReplyTo' is null", object.data)
827 )
828 end
829
830 defp restrict_replies(query, %{
831 reply_filtering_user: user,
832 reply_visibility: "self"
833 }) do
834 from(
835 [activity, object] in query,
836 where:
837 fragment(
838 "?->>'inReplyTo' is null OR ? = ANY(?)",
839 object.data,
840 ^user.ap_id,
841 activity.recipients
842 )
843 )
844 end
845
846 defp restrict_replies(query, %{
847 reply_filtering_user: user,
848 reply_visibility: "following"
849 }) do
850 from(
851 [activity, object] in query,
852 where:
853 fragment(
854 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
855 object.data,
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
857 activity.recipients,
858 activity.actor,
859 activity.actor,
860 ^user.ap_id
861 )
862 )
863 end
864
865 defp restrict_replies(query, _), do: query
866
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
869 end
870
871 defp restrict_reblogs(query, _), do: query
872
873 defp restrict_muted(query, %{with_muted: true}), do: query
874
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
877
878 query =
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
882 )
883
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
886 else
887 query
888 end
889 end
890
891 defp restrict_muted(query, _), do: query
892
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
896
897 following_ap_ids = User.get_friends_ap_ids(user)
898
899 query =
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
901
902 from(
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
906 where:
907 fragment(
908 "recipients_contain_blocked_domains(?, ?) = false",
909 activity.recipients,
910 ^domain_blocks
911 ),
912 where:
913 fragment(
914 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
915 activity.data,
916 activity.data,
917 ^blocked_ap_ids
918 ),
919 where:
920 fragment(
921 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
922 activity.actor,
923 ^domain_blocks,
924 activity.actor,
925 ^following_ap_ids
926 ),
927 where:
928 fragment(
929 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
930 o.data,
931 ^domain_blocks,
932 o.data,
933 ^following_ap_ids
934 )
935 )
936 end
937
938 defp restrict_blocked(query, _), do: query
939
940 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
941 from(
942 activity in query,
943 where:
944 fragment(
945 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
946 activity.data,
947 ^[Constants.as_public()]
948 )
949 )
950 end
951
952 defp restrict_unlisted(query, _), do: query
953
954 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
955 from(activity in query, where: activity.id in ^ids)
956 end
957
958 defp restrict_pinned(query, _), do: query
959
960 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
961 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
962
963 from(
964 activity in query,
965 where:
966 fragment(
967 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
968 activity.data,
969 activity.actor,
970 ^muted_reblogs
971 )
972 )
973 end
974
975 defp restrict_muted_reblogs(query, _), do: query
976
977 defp restrict_instance(query, %{instance: instance}) do
978 users =
979 from(
980 u in User,
981 select: u.ap_id,
982 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
983 )
984 |> Repo.all()
985
986 from(activity in query, where: activity.actor in ^users)
987 end
988
989 defp restrict_instance(query, _), do: query
990
991 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
992
993 defp exclude_poll_votes(query, _) do
994 if has_named_binding?(query, :object) do
995 from([activity, object: o] in query,
996 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
997 )
998 else
999 query
1000 end
1001 end
1002
1003 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1004
1005 defp exclude_invisible_actors(query, _opts) do
1006 invisible_ap_ids =
1007 User.Query.build(%{invisible: true, select: [:ap_id]})
1008 |> Repo.all()
1009 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1010
1011 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1012 end
1013
1014 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1015 from(activity in query, where: activity.id != ^id)
1016 end
1017
1018 defp exclude_id(query, _), do: query
1019
1020 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1021
1022 defp maybe_preload_objects(query, _) do
1023 query
1024 |> Activity.with_preloaded_object()
1025 end
1026
1027 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1028
1029 defp maybe_preload_bookmarks(query, opts) do
1030 query
1031 |> Activity.with_preloaded_bookmark(opts[:user])
1032 end
1033
1034 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1035 query
1036 |> Activity.with_preloaded_report_notes()
1037 end
1038
1039 defp maybe_preload_report_notes(query, _), do: query
1040
1041 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1042
1043 defp maybe_set_thread_muted_field(query, opts) do
1044 query
1045 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1046 end
1047
1048 defp maybe_order(query, %{order: :desc}) do
1049 query
1050 |> order_by(desc: :id)
1051 end
1052
1053 defp maybe_order(query, %{order: :asc}) do
1054 query
1055 |> order_by(asc: :id)
1056 end
1057
1058 defp maybe_order(query, _), do: query
1059
1060 defp fetch_activities_query_ap_ids_ops(opts) do
1061 source_user = opts[:muting_user]
1062 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1063
1064 ap_id_relationships =
1065 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1066 [:block | ap_id_relationships]
1067 else
1068 ap_id_relationships
1069 end
1070
1071 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1072
1073 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1074 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1075
1076 restrict_muted_reblogs_opts =
1077 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1078
1079 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1080 end
1081
1082 def fetch_activities_query(recipients, opts \\ %{}) do
1083 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1084 fetch_activities_query_ap_ids_ops(opts)
1085
1086 config = %{
1087 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1088 }
1089
1090 Activity
1091 |> maybe_preload_objects(opts)
1092 |> maybe_preload_bookmarks(opts)
1093 |> maybe_preload_report_notes(opts)
1094 |> maybe_set_thread_muted_field(opts)
1095 |> maybe_order(opts)
1096 |> restrict_recipients(recipients, opts[:user])
1097 |> restrict_replies(opts)
1098 |> restrict_tag(opts)
1099 |> restrict_tag_reject(opts)
1100 |> restrict_tag_all(opts)
1101 |> restrict_since(opts)
1102 |> restrict_local(opts)
1103 |> restrict_actor(opts)
1104 |> restrict_type(opts)
1105 |> restrict_state(opts)
1106 |> restrict_favorited_by(opts)
1107 |> restrict_blocked(restrict_blocked_opts)
1108 |> restrict_muted(restrict_muted_opts)
1109 |> restrict_media(opts)
1110 |> restrict_visibility(opts)
1111 |> restrict_thread_visibility(opts, config)
1112 |> restrict_reblogs(opts)
1113 |> restrict_pinned(opts)
1114 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1115 |> restrict_instance(opts)
1116 |> Activity.restrict_deactivated_users()
1117 |> exclude_poll_votes(opts)
1118 |> exclude_invisible_actors(opts)
1119 |> exclude_visibility(opts)
1120 end
1121
1122 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1123 list_memberships = Pleroma.List.memberships(opts[:user])
1124
1125 fetch_activities_query(recipients ++ list_memberships, opts)
1126 |> Pagination.fetch_paginated(opts, pagination)
1127 |> Enum.reverse()
1128 |> maybe_update_cc(list_memberships, opts[:user])
1129 end
1130
1131 @doc """
1132 Fetch favorites activities of user with order by sort adds to favorites
1133 """
1134 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1135 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1136 user.ap_id
1137 |> Activity.Queries.by_actor()
1138 |> Activity.Queries.by_type("Like")
1139 |> Activity.with_joined_object()
1140 |> Object.with_joined_activity()
1141 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1142 |> order_by([like, _, _], desc_nulls_last: like.id)
1143 |> Pagination.fetch_paginated(
1144 Map.merge(params, %{skip_order: true}),
1145 pagination
1146 )
1147 end
1148
1149 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1150 Enum.map(activities, fn
1151 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1152 if Enum.any?(bcc, &(&1 in list_memberships)) do
1153 update_in(activity.data["cc"], &[user_ap_id | &1])
1154 else
1155 activity
1156 end
1157
1158 activity ->
1159 activity
1160 end)
1161 end
1162
1163 defp maybe_update_cc(activities, _, _), do: activities
1164
1165 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1166 from(activity in query,
1167 where:
1168 fragment("? && ?", activity.recipients, ^recipients) or
1169 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1170 ^Constants.as_public() in activity.recipients)
1171 )
1172 end
1173
1174 def fetch_activities_bounded(
1175 recipients,
1176 recipients_with_public,
1177 opts \\ %{},
1178 pagination \\ :keyset
1179 ) do
1180 fetch_activities_query([], opts)
1181 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1182 |> Pagination.fetch_paginated(opts, pagination)
1183 |> Enum.reverse()
1184 end
1185
1186 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1187 def upload(file, opts \\ []) do
1188 with {:ok, data} <- Upload.store(file, opts) do
1189 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1190
1191 Repo.insert(%Object{data: obj_data})
1192 end
1193 end
1194
1195 @spec get_actor_url(any()) :: binary() | nil
1196 defp get_actor_url(url) when is_binary(url), do: url
1197 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1198
1199 defp get_actor_url(url) when is_list(url) do
1200 url
1201 |> List.first()
1202 |> get_actor_url()
1203 end
1204
1205 defp get_actor_url(_url), do: nil
1206
1207 defp object_to_user_data(data) do
1208 avatar =
1209 data["icon"]["url"] &&
1210 %{
1211 "type" => "Image",
1212 "url" => [%{"href" => data["icon"]["url"]}]
1213 }
1214
1215 banner =
1216 data["image"]["url"] &&
1217 %{
1218 "type" => "Image",
1219 "url" => [%{"href" => data["image"]["url"]}]
1220 }
1221
1222 fields =
1223 data
1224 |> Map.get("attachment", [])
1225 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1226 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1227
1228 emojis =
1229 data
1230 |> Map.get("tag", [])
1231 |> Enum.filter(fn
1232 %{"type" => "Emoji"} -> true
1233 _ -> false
1234 end)
1235 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1236 {String.trim(name, ":"), url}
1237 end)
1238
1239 locked = data["manuallyApprovesFollowers"] || false
1240 data = Transmogrifier.maybe_fix_user_object(data)
1241 discoverable = data["discoverable"] || false
1242 invisible = data["invisible"] || false
1243 actor_type = data["type"] || "Person"
1244
1245 public_key =
1246 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1247 data["publicKey"]["publicKeyPem"]
1248 else
1249 nil
1250 end
1251
1252 shared_inbox =
1253 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1254 data["endpoints"]["sharedInbox"]
1255 else
1256 nil
1257 end
1258
1259 user_data = %{
1260 ap_id: data["id"],
1261 uri: get_actor_url(data["url"]),
1262 ap_enabled: true,
1263 banner: banner,
1264 fields: fields,
1265 emoji: emojis,
1266 locked: locked,
1267 discoverable: discoverable,
1268 invisible: invisible,
1269 avatar: avatar,
1270 name: data["name"],
1271 follower_address: data["followers"],
1272 following_address: data["following"],
1273 bio: data["summary"],
1274 actor_type: actor_type,
1275 also_known_as: Map.get(data, "alsoKnownAs", []),
1276 public_key: public_key,
1277 inbox: data["inbox"],
1278 shared_inbox: shared_inbox
1279 }
1280
1281 # nickname can be nil because of virtual actors
1282 if data["preferredUsername"] do
1283 Map.put(
1284 user_data,
1285 :nickname,
1286 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1287 )
1288 else
1289 Map.put(user_data, :nickname, nil)
1290 end
1291 end
1292
1293 def fetch_follow_information_for_user(user) do
1294 with {:ok, following_data} <-
1295 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1296 {:ok, hide_follows} <- collection_private(following_data),
1297 {:ok, followers_data} <-
1298 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1299 {:ok, hide_followers} <- collection_private(followers_data) do
1300 {:ok,
1301 %{
1302 hide_follows: hide_follows,
1303 follower_count: normalize_counter(followers_data["totalItems"]),
1304 following_count: normalize_counter(following_data["totalItems"]),
1305 hide_followers: hide_followers
1306 }}
1307 else
1308 {:error, _} = e -> e
1309 e -> {:error, e}
1310 end
1311 end
1312
1313 defp normalize_counter(counter) when is_integer(counter), do: counter
1314 defp normalize_counter(_), do: 0
1315
1316 def maybe_update_follow_information(user_data) do
1317 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1318 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1319 {_, true} <-
1320 {:collections_available,
1321 !!(user_data[:following_address] && user_data[:follower_address])},
1322 {:ok, info} <-
1323 fetch_follow_information_for_user(user_data) do
1324 info = Map.merge(user_data[:info] || %{}, info)
1325
1326 user_data
1327 |> Map.put(:info, info)
1328 else
1329 {:user_type_check, false} ->
1330 user_data
1331
1332 {:collections_available, false} ->
1333 user_data
1334
1335 {:enabled, false} ->
1336 user_data
1337
1338 e ->
1339 Logger.error(
1340 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1341 )
1342
1343 user_data
1344 end
1345 end
1346
1347 defp collection_private(%{"first" => %{"type" => type}})
1348 when type in ["CollectionPage", "OrderedCollectionPage"],
1349 do: {:ok, false}
1350
1351 defp collection_private(%{"first" => first}) do
1352 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1353 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1354 {:ok, false}
1355 else
1356 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1357 {:error, _} = e -> e
1358 e -> {:error, e}
1359 end
1360 end
1361
1362 defp collection_private(_data), do: {:ok, true}
1363
1364 def user_data_from_user_object(data) do
1365 with {:ok, data} <- MRF.filter(data) do
1366 {:ok, object_to_user_data(data)}
1367 else
1368 e -> {:error, e}
1369 end
1370 end
1371
1372 def fetch_and_prepare_user_from_ap_id(ap_id) do
1373 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1374 {:ok, data} <- user_data_from_user_object(data) do
1375 {:ok, maybe_update_follow_information(data)}
1376 else
1377 {:error, "Object has been deleted" = e} ->
1378 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1379 {:error, e}
1380
1381 {:error, e} ->
1382 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1383 {:error, e}
1384 end
1385 end
1386
1387 def make_user_from_ap_id(ap_id) do
1388 user = User.get_cached_by_ap_id(ap_id)
1389
1390 if user && !User.ap_enabled?(user) do
1391 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1392 else
1393 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1394 if user do
1395 user
1396 |> User.remote_user_changeset(data)
1397 |> User.update_and_set_cache()
1398 else
1399 data
1400 |> User.remote_user_changeset()
1401 |> Repo.insert()
1402 |> User.set_cache()
1403 end
1404 end
1405 end
1406 end
1407
1408 def make_user_from_nickname(nickname) do
1409 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1410 make_user_from_ap_id(ap_id)
1411 else
1412 _e -> {:error, "No AP id in WebFinger"}
1413 end
1414 end
1415
1416 # filter out broken threads
1417 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1418 entire_thread_visible_for_user?(activity, user)
1419 end
1420
1421 # do post-processing on a specific activity
1422 def contain_activity(%Activity{} = activity, %User{} = user) do
1423 contain_broken_threads(activity, user)
1424 end
1425
1426 def fetch_direct_messages_query do
1427 Activity
1428 |> restrict_type(%{type: "Create"})
1429 |> restrict_visibility(%{visibility: "direct"})
1430 |> order_by([activity], asc: activity.id)
1431 end
1432 end