aeec4beae5d80043f718eeb633e09bf13aa14c3f
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Maps
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
18 alias Pleroma.Repo
19 alias Pleroma.Upload
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(nil), do: true
71
72 defp check_actor_is_active(actor) when is_binary(actor) do
73 case User.get_cached_by_ap_id(actor) do
74 %User{deactivated: deactivated} -> not deactivated
75 _ -> false
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 defp increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 defp increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 defp increase_replies_count_if_reply(_create_data), do: :noop
104
105 defp increase_poll_votes_if_vote(%{
106 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
107 "type" => "Create",
108 "actor" => actor
109 }) do
110 Object.increase_vote_count(reply_ap_id, name, actor)
111 end
112
113 defp increase_poll_votes_if_vote(_create_data), do: :noop
114
115 @object_types ["ChatMessage"]
116 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
117 def persist(%{"type" => type} = object, meta) when type in @object_types do
118 with {:ok, object} <- Object.create(object) do
119 {:ok, object, meta}
120 end
121 end
122
123 def persist(object, meta) do
124 with local <- Keyword.fetch!(meta, :local),
125 {recipients, _, _} <- get_recipients(object),
126 {:ok, activity} <-
127 Repo.insert(%Activity{
128 data: object,
129 local: local,
130 recipients: recipients,
131 actor: object["actor"]
132 }) do
133 {:ok, activity, meta}
134 end
135 end
136
137 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
138 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
139 with nil <- Activity.normalize(map),
140 map <- lazy_put_activity_defaults(map, fake),
141 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
142 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
143 {:ok, map} <- MRF.filter(map),
144 {recipients, _, _} = get_recipients(map),
145 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
146 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
147 {:ok, map, object} <- insert_full_object(map) do
148 {:ok, activity} =
149 Repo.insert(%Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients
154 })
155
156 # Splice in the child object if we have one.
157 activity = Maps.put_if_present(activity, :object, object)
158
159 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
160
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 def notify_and_stream(activity) do
184 Notification.create_notifications(activity)
185
186 conversation = create_or_bump_conversation(activity, activity.actor)
187 participations = get_participations(conversation)
188 stream_out(activity)
189 stream_out_participations(participations)
190 end
191
192 defp create_or_bump_conversation(activity, actor) do
193 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
194 %User{} = user <- User.get_cached_by_ap_id(actor) do
195 Participation.mark_as_read(user, conversation)
196 {:ok, conversation}
197 end
198 end
199
200 defp get_participations({:ok, conversation}) do
201 conversation
202 |> Repo.preload(:participations, force: true)
203 |> Map.get(:participations)
204 end
205
206 defp get_participations(_), do: []
207
208 def stream_out_participations(participations) do
209 participations =
210 participations
211 |> Repo.preload(:user)
212
213 Streamer.stream("participation", participations)
214 end
215
216 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
217 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
218 conversation = Repo.preload(conversation, :participations)
219
220 last_activity_id =
221 fetch_latest_activity_id_for_context(conversation.ap_id, %{
222 user: user,
223 blocking_user: user
224 })
225
226 if last_activity_id do
227 stream_out_participations(conversation.participations)
228 end
229 end
230 end
231
232 def stream_out_participations(_, _), do: :noop
233
234 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
235 when data_type in ["Create", "Announce", "Delete"] do
236 activity
237 |> Topics.get_activity_topics()
238 |> Streamer.stream(activity)
239 end
240
241 def stream_out(_activity) do
242 :noop
243 end
244
245 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
246 def create(params, fake \\ false) do
247 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
248 result
249 end
250 end
251
252 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Config.get([:env]) == :benchmark
258
259 create_data =
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 )
264
265 with {:ok, activity} <- insert(create_data, local, fake),
266 {:fake, false, activity} <- {:fake, fake, activity},
267 _ <- increase_replies_count_if_reply(create_data),
268 _ <- increase_poll_votes_if_vote(create_data),
269 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
270 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
271 _ <- notify_and_stream(activity),
272 :ok <- maybe_federate(activity) do
273 {:ok, activity}
274 else
275 {:quick_insert, true, activity} ->
276 {:ok, activity}
277
278 {:fake, true, activity} ->
279 {:ok, activity}
280
281 {:error, message} ->
282 Repo.rollback(message)
283 end
284 end
285
286 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
287 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
288 additional = params[:additional] || %{}
289 # only accept false as false value
290 local = !(params[:local] == false)
291 published = params[:published]
292
293 listen_data =
294 make_listen_data(
295 %{to: to, actor: actor, published: published, context: context, object: object},
296 additional
297 )
298
299 with {:ok, activity} <- insert(listen_data, local),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_federate(activity) do
302 {:ok, activity}
303 end
304 end
305
306 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
307 def accept(params) do
308 accept_or_reject("Accept", params)
309 end
310
311 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
312 def reject(params) do
313 accept_or_reject("Reject", params)
314 end
315
316 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
317 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
318 local = Map.get(params, :local, true)
319 activity_id = Map.get(params, :activity_id, nil)
320
321 data =
322 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
323 |> Maps.put_if_present("id", activity_id)
324
325 with {:ok, activity} <- insert(data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity}
329 end
330 end
331
332 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
333 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
334 local = !(params[:local] == false)
335 activity_id = params[:activity_id]
336
337 data =
338 %{
339 "to" => to,
340 "cc" => cc,
341 "type" => "Update",
342 "actor" => actor,
343 "object" => object
344 }
345 |> Maps.put_if_present("id", activity_id)
346
347 with {:ok, activity} <- insert(data, local),
348 _ <- notify_and_stream(activity),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity}
351 end
352 end
353
354 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
355 {:ok, Activity.t()} | {:error, any()}
356 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
357 with {:ok, result} <-
358 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
359 result
360 end
361 end
362
363 defp do_follow(follower, followed, activity_id, local, opts) do
364 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
365 data = make_follow_data(follower, followed, activity_id)
366
367 with {:ok, activity} <- insert(data, local),
368 _ <- skip_notify_and_stream || notify_and_stream(activity),
369 :ok <- maybe_federate(activity) do
370 {:ok, activity}
371 else
372 {:error, error} -> Repo.rollback(error)
373 end
374 end
375
376 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
377 {:ok, Activity.t()} | nil | {:error, any()}
378 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
379 with {:ok, result} <-
380 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
381 result
382 end
383 end
384
385 defp do_unfollow(follower, followed, activity_id, local) do
386 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
387 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
388 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
389 {:ok, activity} <- insert(unfollow_data, local),
390 _ <- notify_and_stream(activity),
391 :ok <- maybe_federate(activity) do
392 {:ok, activity}
393 else
394 nil -> nil
395 {:error, error} -> Repo.rollback(error)
396 end
397 end
398
399 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
400 {:ok, Activity.t()} | {:error, any()}
401 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
402 with {:ok, result} <-
403 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
404 result
405 end
406 end
407
408 defp do_block(blocker, blocked, activity_id, local) do
409 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
410
411 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
412 unfollow(blocker, blocked, nil, local)
413 end
414
415 block_data = make_block_data(blocker, blocked, activity_id)
416
417 with {:ok, activity} <- insert(block_data, local),
418 _ <- notify_and_stream(activity),
419 :ok <- maybe_federate(activity) do
420 {:ok, activity}
421 else
422 {:error, error} -> Repo.rollback(error)
423 end
424 end
425
426 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
427 def flag(
428 %{
429 actor: actor,
430 context: _context,
431 account: account,
432 statuses: statuses,
433 content: content
434 } = params
435 ) do
436 # only accept false as false value
437 local = !(params[:local] == false)
438 forward = !(params[:forward] == false)
439
440 additional = params[:additional] || %{}
441
442 additional =
443 if forward do
444 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
445 else
446 Map.merge(additional, %{"to" => [], "cc" => []})
447 end
448
449 with flag_data <- make_flag_data(params, additional),
450 {:ok, activity} <- insert(flag_data, local),
451 {:ok, stripped_activity} <- strip_report_status_data(activity),
452 _ <- notify_and_stream(activity),
453 :ok <- maybe_federate(stripped_activity) do
454 User.all_superusers()
455 |> Enum.filter(fn user -> not is_nil(user.email) end)
456 |> Enum.each(fn superuser ->
457 superuser
458 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
459 |> Pleroma.Emails.Mailer.deliver_async()
460 end)
461
462 {:ok, activity}
463 end
464 end
465
466 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
467 def move(%User{} = origin, %User{} = target, local \\ true) do
468 params = %{
469 "type" => "Move",
470 "actor" => origin.ap_id,
471 "object" => origin.ap_id,
472 "target" => target.ap_id
473 }
474
475 with true <- origin.ap_id in target.also_known_as,
476 {:ok, activity} <- insert(params, local),
477 _ <- notify_and_stream(activity) do
478 maybe_federate(activity)
479
480 BackgroundWorker.enqueue("move_following", %{
481 "origin_id" => origin.id,
482 "target_id" => target.id
483 })
484
485 {:ok, activity}
486 else
487 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
488 err -> err
489 end
490 end
491
492 def fetch_activities_for_context_query(context, opts) do
493 public = [Constants.as_public()]
494
495 recipients =
496 if opts[:user],
497 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
498 else: public
499
500 from(activity in Activity)
501 |> maybe_preload_objects(opts)
502 |> maybe_preload_bookmarks(opts)
503 |> maybe_set_thread_muted_field(opts)
504 |> restrict_blocked(opts)
505 |> restrict_recipients(recipients, opts[:user])
506 |> where(
507 [activity],
508 fragment(
509 "?->>'type' = ? and ?->>'context' = ?",
510 activity.data,
511 "Create",
512 activity.data,
513 ^context
514 )
515 )
516 |> exclude_poll_votes(opts)
517 |> exclude_id(opts)
518 |> order_by([activity], desc: activity.id)
519 end
520
521 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
522 def fetch_activities_for_context(context, opts \\ %{}) do
523 context
524 |> fetch_activities_for_context_query(opts)
525 |> Repo.all()
526 end
527
528 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
529 FlakeId.Ecto.CompatType.t() | nil
530 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
531 context
532 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
533 |> limit(1)
534 |> select([a], a.id)
535 |> Repo.one()
536 end
537
538 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
539 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
540 opts = Map.delete(opts, :user)
541
542 [Constants.as_public()]
543 |> fetch_activities_query(opts)
544 |> restrict_unlisted(opts)
545 |> Pagination.fetch_paginated(opts, pagination)
546 end
547
548 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
549 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
550 opts
551 |> Map.put(:restrict_unlisted, true)
552 |> fetch_public_or_unlisted_activities(pagination)
553 end
554
555 @valid_visibilities ~w[direct unlisted public private]
556
557 defp restrict_visibility(query, %{visibility: visibility})
558 when is_list(visibility) do
559 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
560 from(
561 a in query,
562 where:
563 fragment(
564 "activity_visibility(?, ?, ?) = ANY (?)",
565 a.actor,
566 a.recipients,
567 a.data,
568 ^visibility
569 )
570 )
571 else
572 Logger.error("Could not restrict visibility to #{visibility}")
573 end
574 end
575
576 defp restrict_visibility(query, %{visibility: visibility})
577 when visibility in @valid_visibilities do
578 from(
579 a in query,
580 where:
581 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
582 )
583 end
584
585 defp restrict_visibility(_query, %{visibility: visibility})
586 when visibility not in @valid_visibilities do
587 Logger.error("Could not restrict visibility to #{visibility}")
588 end
589
590 defp restrict_visibility(query, _visibility), do: query
591
592 defp exclude_visibility(query, %{exclude_visibilities: visibility})
593 when is_list(visibility) do
594 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
595 from(
596 a in query,
597 where:
598 not fragment(
599 "activity_visibility(?, ?, ?) = ANY (?)",
600 a.actor,
601 a.recipients,
602 a.data,
603 ^visibility
604 )
605 )
606 else
607 Logger.error("Could not exclude visibility to #{visibility}")
608 query
609 end
610 end
611
612 defp exclude_visibility(query, %{exclude_visibilities: visibility})
613 when visibility in @valid_visibilities do
614 from(
615 a in query,
616 where:
617 not fragment(
618 "activity_visibility(?, ?, ?) = ?",
619 a.actor,
620 a.recipients,
621 a.data,
622 ^visibility
623 )
624 )
625 end
626
627 defp exclude_visibility(query, %{exclude_visibilities: visibility})
628 when visibility not in [nil | @valid_visibilities] do
629 Logger.error("Could not exclude visibility to #{visibility}")
630 query
631 end
632
633 defp exclude_visibility(query, _visibility), do: query
634
635 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
636 do: query
637
638 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
639 do: query
640
641 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
642 from(
643 a in query,
644 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
645 )
646 end
647
648 defp restrict_thread_visibility(query, _, _), do: query
649
650 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
651 params =
652 params
653 |> Map.put(:user, reading_user)
654 |> Map.put(:actor_id, user.ap_id)
655
656 %{
657 godmode: params[:godmode],
658 reading_user: reading_user
659 }
660 |> user_activities_recipients()
661 |> fetch_activities(params)
662 |> Enum.reverse()
663 end
664
665 def fetch_user_activities(user, reading_user, params \\ %{}) do
666 params =
667 params
668 |> Map.put(:type, ["Create", "Announce"])
669 |> Map.put(:user, reading_user)
670 |> Map.put(:actor_id, user.ap_id)
671 |> Map.put(:pinned_activity_ids, user.pinned_activities)
672
673 params =
674 if User.blocks?(reading_user, user) do
675 params
676 else
677 params
678 |> Map.put(:blocking_user, reading_user)
679 |> Map.put(:muting_user, reading_user)
680 end
681
682 %{
683 godmode: params[:godmode],
684 reading_user: reading_user
685 }
686 |> user_activities_recipients()
687 |> fetch_activities(params)
688 |> Enum.reverse()
689 end
690
691 def fetch_statuses(reading_user, params) do
692 params = Map.put(params, :type, ["Create", "Announce"])
693
694 %{
695 godmode: params[:godmode],
696 reading_user: reading_user
697 }
698 |> user_activities_recipients()
699 |> fetch_activities(params, :offset)
700 |> Enum.reverse()
701 end
702
703 defp user_activities_recipients(%{godmode: true}), do: []
704
705 defp user_activities_recipients(%{reading_user: reading_user}) do
706 if reading_user do
707 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
708 else
709 [Constants.as_public()]
710 end
711 end
712
713 defp restrict_since(query, %{since_id: ""}), do: query
714
715 defp restrict_since(query, %{since_id: since_id}) do
716 from(activity in query, where: activity.id > ^since_id)
717 end
718
719 defp restrict_since(query, _), do: query
720
721 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
722 raise "Can't use the child object without preloading!"
723 end
724
725 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
726 from(
727 [_activity, object] in query,
728 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
729 )
730 end
731
732 defp restrict_tag_reject(query, _), do: query
733
734 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
735 raise "Can't use the child object without preloading!"
736 end
737
738 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
739 from(
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
742 )
743 end
744
745 defp restrict_tag_all(query, _), do: query
746
747 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
748 raise "Can't use the child object without preloading!"
749 end
750
751 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
752 from(
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
755 )
756 end
757
758 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
759 from(
760 [_activity, object] in query,
761 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
762 )
763 end
764
765 defp restrict_tag(query, _), do: query
766
767 defp restrict_recipients(query, [], _user), do: query
768
769 defp restrict_recipients(query, recipients, nil) do
770 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
771 end
772
773 defp restrict_recipients(query, recipients, user) do
774 from(
775 activity in query,
776 where: fragment("? && ?", ^recipients, activity.recipients),
777 or_where: activity.actor == ^user.ap_id
778 )
779 end
780
781 defp restrict_local(query, %{local_only: true}) do
782 from(activity in query, where: activity.local == true)
783 end
784
785 defp restrict_local(query, _), do: query
786
787 defp restrict_actor(query, %{actor_id: actor_id}) do
788 from(activity in query, where: activity.actor == ^actor_id)
789 end
790
791 defp restrict_actor(query, _), do: query
792
793 defp restrict_type(query, %{type: type}) when is_binary(type) do
794 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
795 end
796
797 defp restrict_type(query, %{type: type}) do
798 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
799 end
800
801 defp restrict_type(query, _), do: query
802
803 defp restrict_state(query, %{state: state}) do
804 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
805 end
806
807 defp restrict_state(query, _), do: query
808
809 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
810 from(
811 [_activity, object] in query,
812 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
813 )
814 end
815
816 defp restrict_favorited_by(query, _), do: query
817
818 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
819 raise "Can't use the child object without preloading!"
820 end
821
822 defp restrict_media(query, %{only_media: true}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
826 )
827 end
828
829 defp restrict_media(query, _), do: query
830
831 defp restrict_replies(query, %{exclude_replies: true}) do
832 from(
833 [_activity, object] in query,
834 where: fragment("?->>'inReplyTo' is null", object.data)
835 )
836 end
837
838 defp restrict_replies(query, %{
839 reply_filtering_user: user,
840 reply_visibility: "self"
841 }) do
842 from(
843 [activity, object] in query,
844 where:
845 fragment(
846 "?->>'inReplyTo' is null OR ? = ANY(?)",
847 object.data,
848 ^user.ap_id,
849 activity.recipients
850 )
851 )
852 end
853
854 defp restrict_replies(query, %{
855 reply_filtering_user: user,
856 reply_visibility: "following"
857 }) do
858 from(
859 [activity, object] in query,
860 where:
861 fragment(
862 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
863 object.data,
864 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
865 activity.recipients,
866 activity.actor,
867 activity.actor,
868 ^user.ap_id
869 )
870 )
871 end
872
873 defp restrict_replies(query, _), do: query
874
875 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
876 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
877 end
878
879 defp restrict_reblogs(query, _), do: query
880
881 defp restrict_muted(query, %{with_muted: true}), do: query
882
883 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
884 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
885
886 query =
887 from([activity] in query,
888 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
889 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
890 )
891
892 unless opts[:skip_preload] do
893 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
894 else
895 query
896 end
897 end
898
899 defp restrict_muted(query, _), do: query
900
901 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
902 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
903 domain_blocks = user.domain_blocks || []
904
905 following_ap_ids = User.get_friends_ap_ids(user)
906
907 query =
908 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
909
910 from(
911 [activity, object: o] in query,
912 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
913 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
914 where:
915 fragment(
916 "recipients_contain_blocked_domains(?, ?) = false",
917 activity.recipients,
918 ^domain_blocks
919 ),
920 where:
921 fragment(
922 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
923 activity.data,
924 activity.data,
925 ^blocked_ap_ids
926 ),
927 where:
928 fragment(
929 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
930 activity.actor,
931 ^domain_blocks,
932 activity.actor,
933 ^following_ap_ids
934 ),
935 where:
936 fragment(
937 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
938 o.data,
939 ^domain_blocks,
940 o.data,
941 ^following_ap_ids
942 )
943 )
944 end
945
946 defp restrict_blocked(query, _), do: query
947
948 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
949 from(
950 activity in query,
951 where:
952 fragment(
953 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
954 activity.data,
955 ^[Constants.as_public()]
956 )
957 )
958 end
959
960 defp restrict_unlisted(query, _), do: query
961
962 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
963 from(activity in query, where: activity.id in ^ids)
964 end
965
966 defp restrict_pinned(query, _), do: query
967
968 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
969 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
970
971 from(
972 activity in query,
973 where:
974 fragment(
975 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
976 activity.data,
977 activity.actor,
978 ^muted_reblogs
979 )
980 )
981 end
982
983 defp restrict_muted_reblogs(query, _), do: query
984
985 defp restrict_instance(query, %{instance: instance}) do
986 users =
987 from(
988 u in User,
989 select: u.ap_id,
990 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
991 )
992 |> Repo.all()
993
994 from(activity in query, where: activity.actor in ^users)
995 end
996
997 defp restrict_instance(query, _), do: query
998
999 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1000
1001 defp exclude_poll_votes(query, _) do
1002 if has_named_binding?(query, :object) do
1003 from([activity, object: o] in query,
1004 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1005 )
1006 else
1007 query
1008 end
1009 end
1010
1011 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1012
1013 defp exclude_chat_messages(query, _) do
1014 if has_named_binding?(query, :object) do
1015 from([activity, object: o] in query,
1016 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1017 )
1018 else
1019 query
1020 end
1021 end
1022
1023 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1024
1025 defp exclude_invisible_actors(query, _opts) do
1026 invisible_ap_ids =
1027 User.Query.build(%{invisible: true, select: [:ap_id]})
1028 |> Repo.all()
1029 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1030
1031 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1032 end
1033
1034 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1035 from(activity in query, where: activity.id != ^id)
1036 end
1037
1038 defp exclude_id(query, _), do: query
1039
1040 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1041
1042 defp maybe_preload_objects(query, _) do
1043 query
1044 |> Activity.with_preloaded_object()
1045 end
1046
1047 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1048
1049 defp maybe_preload_bookmarks(query, opts) do
1050 query
1051 |> Activity.with_preloaded_bookmark(opts[:user])
1052 end
1053
1054 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1055 query
1056 |> Activity.with_preloaded_report_notes()
1057 end
1058
1059 defp maybe_preload_report_notes(query, _), do: query
1060
1061 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1062
1063 defp maybe_set_thread_muted_field(query, opts) do
1064 query
1065 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1066 end
1067
1068 defp maybe_order(query, %{order: :desc}) do
1069 query
1070 |> order_by(desc: :id)
1071 end
1072
1073 defp maybe_order(query, %{order: :asc}) do
1074 query
1075 |> order_by(asc: :id)
1076 end
1077
1078 defp maybe_order(query, _), do: query
1079
1080 defp fetch_activities_query_ap_ids_ops(opts) do
1081 source_user = opts[:muting_user]
1082 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1083
1084 ap_id_relationships =
1085 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1086 [:block | ap_id_relationships]
1087 else
1088 ap_id_relationships
1089 end
1090
1091 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1092
1093 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1094 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1095
1096 restrict_muted_reblogs_opts =
1097 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1098
1099 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1100 end
1101
1102 def fetch_activities_query(recipients, opts \\ %{}) do
1103 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1104 fetch_activities_query_ap_ids_ops(opts)
1105
1106 config = %{
1107 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1108 }
1109
1110 Activity
1111 |> maybe_preload_objects(opts)
1112 |> maybe_preload_bookmarks(opts)
1113 |> maybe_preload_report_notes(opts)
1114 |> maybe_set_thread_muted_field(opts)
1115 |> maybe_order(opts)
1116 |> restrict_recipients(recipients, opts[:user])
1117 |> restrict_replies(opts)
1118 |> restrict_tag(opts)
1119 |> restrict_tag_reject(opts)
1120 |> restrict_tag_all(opts)
1121 |> restrict_since(opts)
1122 |> restrict_local(opts)
1123 |> restrict_actor(opts)
1124 |> restrict_type(opts)
1125 |> restrict_state(opts)
1126 |> restrict_favorited_by(opts)
1127 |> restrict_blocked(restrict_blocked_opts)
1128 |> restrict_muted(restrict_muted_opts)
1129 |> restrict_media(opts)
1130 |> restrict_visibility(opts)
1131 |> restrict_thread_visibility(opts, config)
1132 |> restrict_reblogs(opts)
1133 |> restrict_pinned(opts)
1134 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1135 |> restrict_instance(opts)
1136 |> Activity.restrict_deactivated_users()
1137 |> exclude_poll_votes(opts)
1138 |> exclude_chat_messages(opts)
1139 |> exclude_invisible_actors(opts)
1140 |> exclude_visibility(opts)
1141 end
1142
1143 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1144 list_memberships = Pleroma.List.memberships(opts[:user])
1145
1146 fetch_activities_query(recipients ++ list_memberships, opts)
1147 |> Pagination.fetch_paginated(opts, pagination)
1148 |> Enum.reverse()
1149 |> maybe_update_cc(list_memberships, opts[:user])
1150 end
1151
1152 @doc """
1153 Fetch favorites activities of user with order by sort adds to favorites
1154 """
1155 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1156 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1157 user.ap_id
1158 |> Activity.Queries.by_actor()
1159 |> Activity.Queries.by_type("Like")
1160 |> Activity.with_joined_object()
1161 |> Object.with_joined_activity()
1162 |> select([_like, object, activity], %{activity | object: object})
1163 |> order_by([like, _, _], desc_nulls_last: like.id)
1164 |> Pagination.fetch_paginated(
1165 Map.merge(params, %{skip_order: true}),
1166 pagination,
1167 :object_activity
1168 )
1169 end
1170
1171 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1172 Enum.map(activities, fn
1173 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1174 if Enum.any?(bcc, &(&1 in list_memberships)) do
1175 update_in(activity.data["cc"], &[user_ap_id | &1])
1176 else
1177 activity
1178 end
1179
1180 activity ->
1181 activity
1182 end)
1183 end
1184
1185 defp maybe_update_cc(activities, _, _), do: activities
1186
1187 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1188 from(activity in query,
1189 where:
1190 fragment("? && ?", activity.recipients, ^recipients) or
1191 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1192 ^Constants.as_public() in activity.recipients)
1193 )
1194 end
1195
1196 def fetch_activities_bounded(
1197 recipients,
1198 recipients_with_public,
1199 opts \\ %{},
1200 pagination \\ :keyset
1201 ) do
1202 fetch_activities_query([], opts)
1203 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1204 |> Pagination.fetch_paginated(opts, pagination)
1205 |> Enum.reverse()
1206 end
1207
1208 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1209 def upload(file, opts \\ []) do
1210 with {:ok, data} <- Upload.store(file, opts) do
1211 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1212
1213 Repo.insert(%Object{data: obj_data})
1214 end
1215 end
1216
1217 @spec get_actor_url(any()) :: binary() | nil
1218 defp get_actor_url(url) when is_binary(url), do: url
1219 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1220
1221 defp get_actor_url(url) when is_list(url) do
1222 url
1223 |> List.first()
1224 |> get_actor_url()
1225 end
1226
1227 defp get_actor_url(_url), do: nil
1228
1229 defp object_to_user_data(data) do
1230 avatar =
1231 data["icon"]["url"] &&
1232 %{
1233 "type" => "Image",
1234 "url" => [%{"href" => data["icon"]["url"]}]
1235 }
1236
1237 banner =
1238 data["image"]["url"] &&
1239 %{
1240 "type" => "Image",
1241 "url" => [%{"href" => data["image"]["url"]}]
1242 }
1243
1244 fields =
1245 data
1246 |> Map.get("attachment", [])
1247 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1248 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1249
1250 emojis =
1251 data
1252 |> Map.get("tag", [])
1253 |> Enum.filter(fn
1254 %{"type" => "Emoji"} -> true
1255 _ -> false
1256 end)
1257 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1258 {String.trim(name, ":"), url}
1259 end)
1260
1261 locked = data["manuallyApprovesFollowers"] || false
1262 data = Transmogrifier.maybe_fix_user_object(data)
1263 discoverable = data["discoverable"] || false
1264 invisible = data["invisible"] || false
1265 actor_type = data["type"] || "Person"
1266
1267 public_key =
1268 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1269 data["publicKey"]["publicKeyPem"]
1270 else
1271 nil
1272 end
1273
1274 shared_inbox =
1275 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1276 data["endpoints"]["sharedInbox"]
1277 else
1278 nil
1279 end
1280
1281 user_data = %{
1282 ap_id: data["id"],
1283 uri: get_actor_url(data["url"]),
1284 ap_enabled: true,
1285 banner: banner,
1286 fields: fields,
1287 emoji: emojis,
1288 locked: locked,
1289 discoverable: discoverable,
1290 invisible: invisible,
1291 avatar: avatar,
1292 name: data["name"],
1293 follower_address: data["followers"],
1294 following_address: data["following"],
1295 bio: data["summary"],
1296 actor_type: actor_type,
1297 also_known_as: Map.get(data, "alsoKnownAs", []),
1298 public_key: public_key,
1299 inbox: data["inbox"],
1300 shared_inbox: shared_inbox
1301 }
1302
1303 # nickname can be nil because of virtual actors
1304 if data["preferredUsername"] do
1305 Map.put(
1306 user_data,
1307 :nickname,
1308 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1309 )
1310 else
1311 Map.put(user_data, :nickname, nil)
1312 end
1313 end
1314
1315 def fetch_follow_information_for_user(user) do
1316 with {:ok, following_data} <-
1317 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1318 {:ok, hide_follows} <- collection_private(following_data),
1319 {:ok, followers_data} <-
1320 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1321 {:ok, hide_followers} <- collection_private(followers_data) do
1322 {:ok,
1323 %{
1324 hide_follows: hide_follows,
1325 follower_count: normalize_counter(followers_data["totalItems"]),
1326 following_count: normalize_counter(following_data["totalItems"]),
1327 hide_followers: hide_followers
1328 }}
1329 else
1330 {:error, _} = e -> e
1331 e -> {:error, e}
1332 end
1333 end
1334
1335 defp normalize_counter(counter) when is_integer(counter), do: counter
1336 defp normalize_counter(_), do: 0
1337
1338 def maybe_update_follow_information(user_data) do
1339 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1340 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1341 {_, true} <-
1342 {:collections_available,
1343 !!(user_data[:following_address] && user_data[:follower_address])},
1344 {:ok, info} <-
1345 fetch_follow_information_for_user(user_data) do
1346 info = Map.merge(user_data[:info] || %{}, info)
1347
1348 user_data
1349 |> Map.put(:info, info)
1350 else
1351 {:user_type_check, false} ->
1352 user_data
1353
1354 {:collections_available, false} ->
1355 user_data
1356
1357 {:enabled, false} ->
1358 user_data
1359
1360 e ->
1361 Logger.error(
1362 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1363 )
1364
1365 user_data
1366 end
1367 end
1368
1369 defp collection_private(%{"first" => %{"type" => type}})
1370 when type in ["CollectionPage", "OrderedCollectionPage"],
1371 do: {:ok, false}
1372
1373 defp collection_private(%{"first" => first}) do
1374 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1375 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1376 {:ok, false}
1377 else
1378 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1379 {:error, _} = e -> e
1380 e -> {:error, e}
1381 end
1382 end
1383
1384 defp collection_private(_data), do: {:ok, true}
1385
1386 def user_data_from_user_object(data) do
1387 with {:ok, data} <- MRF.filter(data) do
1388 {:ok, object_to_user_data(data)}
1389 else
1390 e -> {:error, e}
1391 end
1392 end
1393
1394 def fetch_and_prepare_user_from_ap_id(ap_id) do
1395 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1396 {:ok, data} <- user_data_from_user_object(data) do
1397 {:ok, maybe_update_follow_information(data)}
1398 else
1399 {:error, "Object has been deleted" = e} ->
1400 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1401 {:error, e}
1402
1403 {:error, e} ->
1404 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1405 {:error, e}
1406 end
1407 end
1408
1409 def make_user_from_ap_id(ap_id) do
1410 user = User.get_cached_by_ap_id(ap_id)
1411
1412 if user && !User.ap_enabled?(user) do
1413 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1414 else
1415 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1416 if user do
1417 user
1418 |> User.remote_user_changeset(data)
1419 |> User.update_and_set_cache()
1420 else
1421 data
1422 |> User.remote_user_changeset()
1423 |> Repo.insert()
1424 |> User.set_cache()
1425 end
1426 end
1427 end
1428 end
1429
1430 def make_user_from_nickname(nickname) do
1431 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1432 make_user_from_ap_id(ap_id)
1433 else
1434 _e -> {:error, "No AP id in WebFinger"}
1435 end
1436 end
1437
1438 # filter out broken threads
1439 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1440 entire_thread_visible_for_user?(activity, user)
1441 end
1442
1443 # do post-processing on a specific activity
1444 def contain_activity(%Activity{} = activity, %User{} = user) do
1445 contain_broken_threads(activity, user)
1446 end
1447
1448 def fetch_direct_messages_query do
1449 Activity
1450 |> restrict_type(%{type: "Create"})
1451 |> restrict_visibility(%{visibility: "direct"})
1452 |> order_by([activity], asc: activity.id)
1453 end
1454 end