Merge branch 'openapi/admin/relay' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Maps
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
18 alias Pleroma.Repo
19 alias Pleroma.Upload
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(nil), do: true
71
72 defp check_actor_is_active(actor) when is_binary(actor) do
73 case User.get_cached_by_ap_id(actor) do
74 %User{deactivated: deactivated} -> not deactivated
75 _ -> false
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 defp increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 defp increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 defp increase_replies_count_if_reply(_create_data), do: :noop
104
105 defp increase_poll_votes_if_vote(%{
106 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
107 "type" => "Create",
108 "actor" => actor
109 }) do
110 Object.increase_vote_count(reply_ap_id, name, actor)
111 end
112
113 defp increase_poll_votes_if_vote(_create_data), do: :noop
114
115 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
116 def persist(object, meta) do
117 with local <- Keyword.fetch!(meta, :local),
118 {recipients, _, _} <- get_recipients(object),
119 {:ok, activity} <-
120 Repo.insert(%Activity{
121 data: object,
122 local: local,
123 recipients: recipients,
124 actor: object["actor"]
125 }) do
126 {:ok, activity, meta}
127 end
128 end
129
130 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
131 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
132 with nil <- Activity.normalize(map),
133 map <- lazy_put_activity_defaults(map, fake),
134 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
135 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
136 {:ok, map} <- MRF.filter(map),
137 {recipients, _, _} = get_recipients(map),
138 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
139 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
140 {:ok, map, object} <- insert_full_object(map) do
141 {:ok, activity} =
142 Repo.insert(%Activity{
143 data: map,
144 local: local,
145 actor: map["actor"],
146 recipients: recipients
147 })
148
149 # Splice in the child object if we have one.
150 activity = Maps.put_if_present(activity, :object, object)
151
152 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
153
154 {:ok, activity}
155 else
156 %Activity{} = activity ->
157 {:ok, activity}
158
159 {:fake, true, map, recipients} ->
160 activity = %Activity{
161 data: map,
162 local: local,
163 actor: map["actor"],
164 recipients: recipients,
165 id: "pleroma:fakeid"
166 }
167
168 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
169 {:ok, activity}
170
171 error ->
172 {:error, error}
173 end
174 end
175
176 def notify_and_stream(activity) do
177 Notification.create_notifications(activity)
178
179 conversation = create_or_bump_conversation(activity, activity.actor)
180 participations = get_participations(conversation)
181 stream_out(activity)
182 stream_out_participations(participations)
183 end
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
326 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
327 local = !(params[:local] == false)
328 activity_id = params[:activity_id]
329
330 data =
331 %{
332 "to" => to,
333 "cc" => cc,
334 "type" => "Update",
335 "actor" => actor,
336 "object" => object
337 }
338 |> Maps.put_if_present("id", activity_id)
339
340 with {:ok, activity} <- insert(data, local),
341 _ <- notify_and_stream(activity),
342 :ok <- maybe_federate(activity) do
343 {:ok, activity}
344 end
345 end
346
347 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | {:error, any()}
349 def follow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_follow(follower, followed, activity_id, local) do
357 data = make_follow_data(follower, followed, activity_id)
358
359 with {:ok, activity} <- insert(data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 {:error, error} -> Repo.rollback(error)
365 end
366 end
367
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
373 result
374 end
375 end
376
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
384 {:ok, activity}
385 else
386 nil -> nil
387 {:error, error} -> Repo.rollback(error)
388 end
389 end
390
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
396 result
397 end
398 end
399
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
402
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
405 end
406
407 block_data = make_block_data(blocker, blocked, activity_id)
408
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
412 {:ok, activity}
413 else
414 {:error, error} -> Repo.rollback(error)
415 end
416 end
417
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
419 def flag(
420 %{
421 actor: actor,
422 context: _context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 additional =
435 if forward do
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
437 else
438 Map.merge(additional, %{"to" => [], "cc" => []})
439 end
440
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
449 superuser
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
452 end)
453
454 {:ok, activity}
455 end
456 end
457
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
460 params = %{
461 "type" => "Move",
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
465 }
466
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
471
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
475 })
476
477 {:ok, activity}
478 else
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
480 err -> err
481 end
482 end
483
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
486
487 recipients =
488 if opts[:user],
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
490 else: public
491
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
498 |> where(
499 [activity],
500 fragment(
501 "?->>'type' = ? and ?->>'context' = ?",
502 activity.data,
503 "Create",
504 activity.data,
505 ^context
506 )
507 )
508 |> exclude_poll_votes(opts)
509 |> exclude_id(opts)
510 |> order_by([activity], desc: activity.id)
511 end
512
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
515 context
516 |> fetch_activities_for_context_query(opts)
517 |> Repo.all()
518 end
519
520 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
523 context
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
525 |> limit(1)
526 |> select([a], a.id)
527 |> Repo.one()
528 end
529
530 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
531 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
532 opts = Map.delete(opts, :user)
533
534 [Constants.as_public()]
535 |> fetch_activities_query(opts)
536 |> restrict_unlisted(opts)
537 |> Pagination.fetch_paginated(opts, pagination)
538 end
539
540 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
541 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
542 opts
543 |> Map.put(:restrict_unlisted, true)
544 |> fetch_public_or_unlisted_activities(pagination)
545 end
546
547 @valid_visibilities ~w[direct unlisted public private]
548
549 defp restrict_visibility(query, %{visibility: visibility})
550 when is_list(visibility) do
551 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
552 from(
553 a in query,
554 where:
555 fragment(
556 "activity_visibility(?, ?, ?) = ANY (?)",
557 a.actor,
558 a.recipients,
559 a.data,
560 ^visibility
561 )
562 )
563 else
564 Logger.error("Could not restrict visibility to #{visibility}")
565 end
566 end
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when visibility in @valid_visibilities do
570 from(
571 a in query,
572 where:
573 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
574 )
575 end
576
577 defp restrict_visibility(_query, %{visibility: visibility})
578 when visibility not in @valid_visibilities do
579 Logger.error("Could not restrict visibility to #{visibility}")
580 end
581
582 defp restrict_visibility(query, _visibility), do: query
583
584 defp exclude_visibility(query, %{exclude_visibilities: visibility})
585 when is_list(visibility) do
586 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
587 from(
588 a in query,
589 where:
590 not fragment(
591 "activity_visibility(?, ?, ?) = ANY (?)",
592 a.actor,
593 a.recipients,
594 a.data,
595 ^visibility
596 )
597 )
598 else
599 Logger.error("Could not exclude visibility to #{visibility}")
600 query
601 end
602 end
603
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when visibility in @valid_visibilities do
606 from(
607 a in query,
608 where:
609 not fragment(
610 "activity_visibility(?, ?, ?) = ?",
611 a.actor,
612 a.recipients,
613 a.data,
614 ^visibility
615 )
616 )
617 end
618
619 defp exclude_visibility(query, %{exclude_visibilities: visibility})
620 when visibility not in [nil | @valid_visibilities] do
621 Logger.error("Could not exclude visibility to #{visibility}")
622 query
623 end
624
625 defp exclude_visibility(query, _visibility), do: query
626
627 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
628 do: query
629
630 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
631 do: query
632
633 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
634 from(
635 a in query,
636 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
637 )
638 end
639
640 defp restrict_thread_visibility(query, _, _), do: query
641
642 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
643 params =
644 params
645 |> Map.put(:user, reading_user)
646 |> Map.put(:actor_id, user.ap_id)
647
648 %{
649 godmode: params[:godmode],
650 reading_user: reading_user
651 }
652 |> user_activities_recipients()
653 |> fetch_activities(params)
654 |> Enum.reverse()
655 end
656
657 def fetch_user_activities(user, reading_user, params \\ %{}) do
658 params =
659 params
660 |> Map.put(:type, ["Create", "Announce"])
661 |> Map.put(:user, reading_user)
662 |> Map.put(:actor_id, user.ap_id)
663 |> Map.put(:pinned_activity_ids, user.pinned_activities)
664
665 params =
666 if User.blocks?(reading_user, user) do
667 params
668 else
669 params
670 |> Map.put(:blocking_user, reading_user)
671 |> Map.put(:muting_user, reading_user)
672 end
673
674 %{
675 godmode: params[:godmode],
676 reading_user: reading_user
677 }
678 |> user_activities_recipients()
679 |> fetch_activities(params)
680 |> Enum.reverse()
681 end
682
683 def fetch_statuses(reading_user, params) do
684 params = Map.put(params, :type, ["Create", "Announce"])
685
686 %{
687 godmode: params[:godmode],
688 reading_user: reading_user
689 }
690 |> user_activities_recipients()
691 |> fetch_activities(params, :offset)
692 |> Enum.reverse()
693 end
694
695 defp user_activities_recipients(%{godmode: true}), do: []
696
697 defp user_activities_recipients(%{reading_user: reading_user}) do
698 if reading_user do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
700 else
701 [Constants.as_public()]
702 end
703 end
704
705 defp restrict_since(query, %{since_id: ""}), do: query
706
707 defp restrict_since(query, %{since_id: since_id}) do
708 from(activity in query, where: activity.id > ^since_id)
709 end
710
711 defp restrict_since(query, _), do: query
712
713 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
714 raise "Can't use the child object without preloading!"
715 end
716
717 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
718 from(
719 [_activity, object] in query,
720 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
721 )
722 end
723
724 defp restrict_tag_reject(query, _), do: query
725
726 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
727 raise "Can't use the child object without preloading!"
728 end
729
730 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
731 from(
732 [_activity, object] in query,
733 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
734 )
735 end
736
737 defp restrict_tag_all(query, _), do: query
738
739 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
741 end
742
743 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
744 from(
745 [_activity, object] in query,
746 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
747 )
748 end
749
750 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
751 from(
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
754 )
755 end
756
757 defp restrict_tag(query, _), do: query
758
759 defp restrict_recipients(query, [], _user), do: query
760
761 defp restrict_recipients(query, recipients, nil) do
762 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
763 end
764
765 defp restrict_recipients(query, recipients, user) do
766 from(
767 activity in query,
768 where: fragment("? && ?", ^recipients, activity.recipients),
769 or_where: activity.actor == ^user.ap_id
770 )
771 end
772
773 defp restrict_local(query, %{local_only: true}) do
774 from(activity in query, where: activity.local == true)
775 end
776
777 defp restrict_local(query, _), do: query
778
779 defp restrict_actor(query, %{actor_id: actor_id}) do
780 from(activity in query, where: activity.actor == ^actor_id)
781 end
782
783 defp restrict_actor(query, _), do: query
784
785 defp restrict_type(query, %{type: type}) when is_binary(type) do
786 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
787 end
788
789 defp restrict_type(query, %{type: type}) do
790 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
791 end
792
793 defp restrict_type(query, _), do: query
794
795 defp restrict_state(query, %{state: state}) do
796 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
797 end
798
799 defp restrict_state(query, _), do: query
800
801 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
802 from(
803 [_activity, object] in query,
804 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
805 )
806 end
807
808 defp restrict_favorited_by(query, _), do: query
809
810 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
811 raise "Can't use the child object without preloading!"
812 end
813
814 defp restrict_media(query, %{only_media: true}) do
815 from(
816 [_activity, object] in query,
817 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
818 )
819 end
820
821 defp restrict_media(query, _), do: query
822
823 defp restrict_replies(query, %{exclude_replies: true}) do
824 from(
825 [_activity, object] in query,
826 where: fragment("?->>'inReplyTo' is null", object.data)
827 )
828 end
829
830 defp restrict_replies(query, %{
831 reply_filtering_user: user,
832 reply_visibility: "self"
833 }) do
834 from(
835 [activity, object] in query,
836 where:
837 fragment(
838 "?->>'inReplyTo' is null OR ? = ANY(?)",
839 object.data,
840 ^user.ap_id,
841 activity.recipients
842 )
843 )
844 end
845
846 defp restrict_replies(query, %{
847 reply_filtering_user: user,
848 reply_visibility: "following"
849 }) do
850 from(
851 [activity, object] in query,
852 where:
853 fragment(
854 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
855 object.data,
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
857 activity.recipients,
858 activity.actor,
859 activity.actor,
860 ^user.ap_id
861 )
862 )
863 end
864
865 defp restrict_replies(query, _), do: query
866
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
869 end
870
871 defp restrict_reblogs(query, _), do: query
872
873 defp restrict_muted(query, %{with_muted: true}), do: query
874
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
877
878 query =
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
882 )
883
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
886 else
887 query
888 end
889 end
890
891 defp restrict_muted(query, _), do: query
892
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
896
897 following_ap_ids = User.get_friends_ap_ids(user)
898
899 query =
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
901
902 from(
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
906 where:
907 fragment(
908 "recipients_contain_blocked_domains(?, ?) = false",
909 activity.recipients,
910 ^domain_blocks
911 ),
912 where:
913 fragment(
914 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
915 activity.data,
916 activity.data,
917 ^blocked_ap_ids
918 ),
919 where:
920 fragment(
921 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
922 activity.actor,
923 ^domain_blocks,
924 activity.actor,
925 ^following_ap_ids
926 ),
927 where:
928 fragment(
929 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
930 o.data,
931 ^domain_blocks,
932 o.data,
933 ^following_ap_ids
934 )
935 )
936 end
937
938 defp restrict_blocked(query, _), do: query
939
940 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
941 from(
942 activity in query,
943 where:
944 fragment(
945 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
946 activity.data,
947 ^[Constants.as_public()]
948 )
949 )
950 end
951
952 defp restrict_unlisted(query, _), do: query
953
954 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
955 from(activity in query, where: activity.id in ^ids)
956 end
957
958 defp restrict_pinned(query, _), do: query
959
960 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
961 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
962
963 from(
964 activity in query,
965 where:
966 fragment(
967 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
968 activity.data,
969 activity.actor,
970 ^muted_reblogs
971 )
972 )
973 end
974
975 defp restrict_muted_reblogs(query, _), do: query
976
977 defp restrict_instance(query, %{instance: instance}) do
978 users =
979 from(
980 u in User,
981 select: u.ap_id,
982 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
983 )
984 |> Repo.all()
985
986 from(activity in query, where: activity.actor in ^users)
987 end
988
989 defp restrict_instance(query, _), do: query
990
991 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
992
993 defp exclude_poll_votes(query, _) do
994 if has_named_binding?(query, :object) do
995 from([activity, object: o] in query,
996 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
997 )
998 else
999 query
1000 end
1001 end
1002
1003 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1004
1005 defp exclude_invisible_actors(query, _opts) do
1006 invisible_ap_ids =
1007 User.Query.build(%{invisible: true, select: [:ap_id]})
1008 |> Repo.all()
1009 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1010
1011 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1012 end
1013
1014 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1015 from(activity in query, where: activity.id != ^id)
1016 end
1017
1018 defp exclude_id(query, _), do: query
1019
1020 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1021
1022 defp maybe_preload_objects(query, _) do
1023 query
1024 |> Activity.with_preloaded_object()
1025 end
1026
1027 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1028
1029 defp maybe_preload_bookmarks(query, opts) do
1030 query
1031 |> Activity.with_preloaded_bookmark(opts[:user])
1032 end
1033
1034 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1035 query
1036 |> Activity.with_preloaded_report_notes()
1037 end
1038
1039 defp maybe_preload_report_notes(query, _), do: query
1040
1041 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1042
1043 defp maybe_set_thread_muted_field(query, opts) do
1044 query
1045 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1046 end
1047
1048 defp maybe_order(query, %{order: :desc}) do
1049 query
1050 |> order_by(desc: :id)
1051 end
1052
1053 defp maybe_order(query, %{order: :asc}) do
1054 query
1055 |> order_by(asc: :id)
1056 end
1057
1058 defp maybe_order(query, _), do: query
1059
1060 defp fetch_activities_query_ap_ids_ops(opts) do
1061 source_user = opts[:muting_user]
1062 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1063
1064 ap_id_relationships =
1065 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1066 [:block | ap_id_relationships]
1067 else
1068 ap_id_relationships
1069 end
1070
1071 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1072
1073 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1074 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1075
1076 restrict_muted_reblogs_opts =
1077 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1078
1079 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1080 end
1081
1082 def fetch_activities_query(recipients, opts \\ %{}) do
1083 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1084 fetch_activities_query_ap_ids_ops(opts)
1085
1086 config = %{
1087 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1088 }
1089
1090 Activity
1091 |> maybe_preload_objects(opts)
1092 |> maybe_preload_bookmarks(opts)
1093 |> maybe_preload_report_notes(opts)
1094 |> maybe_set_thread_muted_field(opts)
1095 |> maybe_order(opts)
1096 |> restrict_recipients(recipients, opts[:user])
1097 |> restrict_replies(opts)
1098 |> restrict_tag(opts)
1099 |> restrict_tag_reject(opts)
1100 |> restrict_tag_all(opts)
1101 |> restrict_since(opts)
1102 |> restrict_local(opts)
1103 |> restrict_actor(opts)
1104 |> restrict_type(opts)
1105 |> restrict_state(opts)
1106 |> restrict_favorited_by(opts)
1107 |> restrict_blocked(restrict_blocked_opts)
1108 |> restrict_muted(restrict_muted_opts)
1109 |> restrict_media(opts)
1110 |> restrict_visibility(opts)
1111 |> restrict_thread_visibility(opts, config)
1112 |> restrict_reblogs(opts)
1113 |> restrict_pinned(opts)
1114 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1115 |> restrict_instance(opts)
1116 |> Activity.restrict_deactivated_users()
1117 |> exclude_poll_votes(opts)
1118 |> exclude_invisible_actors(opts)
1119 |> exclude_visibility(opts)
1120 end
1121
1122 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1123 list_memberships = Pleroma.List.memberships(opts[:user])
1124
1125 fetch_activities_query(recipients ++ list_memberships, opts)
1126 |> Pagination.fetch_paginated(opts, pagination)
1127 |> Enum.reverse()
1128 |> maybe_update_cc(list_memberships, opts[:user])
1129 end
1130
1131 @doc """
1132 Fetch favorites activities of user with order by sort adds to favorites
1133 """
1134 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1135 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1136 user.ap_id
1137 |> Activity.Queries.by_actor()
1138 |> Activity.Queries.by_type("Like")
1139 |> Activity.with_joined_object()
1140 |> Object.with_joined_activity()
1141 |> select([_like, object, activity], %{activity | object: object})
1142 |> order_by([like, _, _], desc_nulls_last: like.id)
1143 |> Pagination.fetch_paginated(
1144 Map.merge(params, %{skip_order: true}),
1145 pagination,
1146 :object_activity
1147 )
1148 end
1149
1150 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1151 Enum.map(activities, fn
1152 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1153 if Enum.any?(bcc, &(&1 in list_memberships)) do
1154 update_in(activity.data["cc"], &[user_ap_id | &1])
1155 else
1156 activity
1157 end
1158
1159 activity ->
1160 activity
1161 end)
1162 end
1163
1164 defp maybe_update_cc(activities, _, _), do: activities
1165
1166 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1167 from(activity in query,
1168 where:
1169 fragment("? && ?", activity.recipients, ^recipients) or
1170 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1171 ^Constants.as_public() in activity.recipients)
1172 )
1173 end
1174
1175 def fetch_activities_bounded(
1176 recipients,
1177 recipients_with_public,
1178 opts \\ %{},
1179 pagination \\ :keyset
1180 ) do
1181 fetch_activities_query([], opts)
1182 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1183 |> Pagination.fetch_paginated(opts, pagination)
1184 |> Enum.reverse()
1185 end
1186
1187 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1188 def upload(file, opts \\ []) do
1189 with {:ok, data} <- Upload.store(file, opts) do
1190 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1191
1192 Repo.insert(%Object{data: obj_data})
1193 end
1194 end
1195
1196 @spec get_actor_url(any()) :: binary() | nil
1197 defp get_actor_url(url) when is_binary(url), do: url
1198 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1199
1200 defp get_actor_url(url) when is_list(url) do
1201 url
1202 |> List.first()
1203 |> get_actor_url()
1204 end
1205
1206 defp get_actor_url(_url), do: nil
1207
1208 defp object_to_user_data(data) do
1209 avatar =
1210 data["icon"]["url"] &&
1211 %{
1212 "type" => "Image",
1213 "url" => [%{"href" => data["icon"]["url"]}]
1214 }
1215
1216 banner =
1217 data["image"]["url"] &&
1218 %{
1219 "type" => "Image",
1220 "url" => [%{"href" => data["image"]["url"]}]
1221 }
1222
1223 fields =
1224 data
1225 |> Map.get("attachment", [])
1226 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1227 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1228
1229 emojis =
1230 data
1231 |> Map.get("tag", [])
1232 |> Enum.filter(fn
1233 %{"type" => "Emoji"} -> true
1234 _ -> false
1235 end)
1236 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1237 {String.trim(name, ":"), url}
1238 end)
1239
1240 locked = data["manuallyApprovesFollowers"] || false
1241 data = Transmogrifier.maybe_fix_user_object(data)
1242 discoverable = data["discoverable"] || false
1243 invisible = data["invisible"] || false
1244 actor_type = data["type"] || "Person"
1245
1246 public_key =
1247 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1248 data["publicKey"]["publicKeyPem"]
1249 else
1250 nil
1251 end
1252
1253 shared_inbox =
1254 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1255 data["endpoints"]["sharedInbox"]
1256 else
1257 nil
1258 end
1259
1260 user_data = %{
1261 ap_id: data["id"],
1262 uri: get_actor_url(data["url"]),
1263 ap_enabled: true,
1264 banner: banner,
1265 fields: fields,
1266 emoji: emojis,
1267 locked: locked,
1268 discoverable: discoverable,
1269 invisible: invisible,
1270 avatar: avatar,
1271 name: data["name"],
1272 follower_address: data["followers"],
1273 following_address: data["following"],
1274 bio: data["summary"],
1275 actor_type: actor_type,
1276 also_known_as: Map.get(data, "alsoKnownAs", []),
1277 public_key: public_key,
1278 inbox: data["inbox"],
1279 shared_inbox: shared_inbox
1280 }
1281
1282 # nickname can be nil because of virtual actors
1283 if data["preferredUsername"] do
1284 Map.put(
1285 user_data,
1286 :nickname,
1287 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1288 )
1289 else
1290 Map.put(user_data, :nickname, nil)
1291 end
1292 end
1293
1294 def fetch_follow_information_for_user(user) do
1295 with {:ok, following_data} <-
1296 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1297 {:ok, hide_follows} <- collection_private(following_data),
1298 {:ok, followers_data} <-
1299 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1300 {:ok, hide_followers} <- collection_private(followers_data) do
1301 {:ok,
1302 %{
1303 hide_follows: hide_follows,
1304 follower_count: normalize_counter(followers_data["totalItems"]),
1305 following_count: normalize_counter(following_data["totalItems"]),
1306 hide_followers: hide_followers
1307 }}
1308 else
1309 {:error, _} = e -> e
1310 e -> {:error, e}
1311 end
1312 end
1313
1314 defp normalize_counter(counter) when is_integer(counter), do: counter
1315 defp normalize_counter(_), do: 0
1316
1317 def maybe_update_follow_information(user_data) do
1318 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1319 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1320 {_, true} <-
1321 {:collections_available,
1322 !!(user_data[:following_address] && user_data[:follower_address])},
1323 {:ok, info} <-
1324 fetch_follow_information_for_user(user_data) do
1325 info = Map.merge(user_data[:info] || %{}, info)
1326
1327 user_data
1328 |> Map.put(:info, info)
1329 else
1330 {:user_type_check, false} ->
1331 user_data
1332
1333 {:collections_available, false} ->
1334 user_data
1335
1336 {:enabled, false} ->
1337 user_data
1338
1339 e ->
1340 Logger.error(
1341 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1342 )
1343
1344 user_data
1345 end
1346 end
1347
1348 defp collection_private(%{"first" => %{"type" => type}})
1349 when type in ["CollectionPage", "OrderedCollectionPage"],
1350 do: {:ok, false}
1351
1352 defp collection_private(%{"first" => first}) do
1353 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1354 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1355 {:ok, false}
1356 else
1357 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1358 {:error, _} = e -> e
1359 e -> {:error, e}
1360 end
1361 end
1362
1363 defp collection_private(_data), do: {:ok, true}
1364
1365 def user_data_from_user_object(data) do
1366 with {:ok, data} <- MRF.filter(data) do
1367 {:ok, object_to_user_data(data)}
1368 else
1369 e -> {:error, e}
1370 end
1371 end
1372
1373 def fetch_and_prepare_user_from_ap_id(ap_id) do
1374 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1375 {:ok, data} <- user_data_from_user_object(data) do
1376 {:ok, maybe_update_follow_information(data)}
1377 else
1378 {:error, "Object has been deleted" = e} ->
1379 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1380 {:error, e}
1381
1382 {:error, e} ->
1383 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1384 {:error, e}
1385 end
1386 end
1387
1388 def make_user_from_ap_id(ap_id) do
1389 user = User.get_cached_by_ap_id(ap_id)
1390
1391 if user && !User.ap_enabled?(user) do
1392 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1393 else
1394 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1395 if user do
1396 user
1397 |> User.remote_user_changeset(data)
1398 |> User.update_and_set_cache()
1399 else
1400 data
1401 |> User.remote_user_changeset()
1402 |> Repo.insert()
1403 |> User.set_cache()
1404 end
1405 end
1406 end
1407 end
1408
1409 def make_user_from_nickname(nickname) do
1410 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1411 make_user_from_ap_id(ap_id)
1412 else
1413 _e -> {:error, "No AP id in WebFinger"}
1414 end
1415 end
1416
1417 # filter out broken threads
1418 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1419 entire_thread_visible_for_user?(activity, user)
1420 end
1421
1422 # do post-processing on a specific activity
1423 def contain_activity(%Activity{} = activity, %User{} = user) do
1424 contain_broken_threads(activity, user)
1425 end
1426
1427 def fetch_direct_messages_query do
1428 Activity
1429 |> restrict_type(%{type: "Create"})
1430 |> restrict_visibility(%{visibility: "direct"})
1431 |> order_by([activity], asc: activity.id)
1432 end
1433 end