Moving some background jobs into simple tasks
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }),
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
108 end
109 end
110
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
125
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
128 end)
129
130 {:ok, activity}
131 else
132 %Activity{} = activity ->
133 {:ok, activity}
134
135 {:actor_check, _} ->
136 {:error, false}
137
138 {:containment, _} = error ->
139 error
140
141 {:error, _} = error ->
142 error
143
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
146 data: map,
147 local: local,
148 actor: map["actor"],
149 recipients: recipients,
150 id: "pleroma:fakeid"
151 }
152
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:ok, activity}
155
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
158
159 {:reject, _} = e ->
160 {:error, e}
161 end
162 end
163
164 defp insert_activity_with_expiration(data, local, recipients) do
165 struct = %Activity{
166 data: data,
167 local: local,
168 actor: data["actor"],
169 recipients: recipients
170 }
171
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
174 end
175 end
176
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
179
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
182 stream_out(activity)
183 stream_out_participations(participations)
184 end
185
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 ) do
189 with {:ok, _job} <-
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
193 }) do
194 {:ok, activity}
195 end
196 end
197
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
199
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
204 {:ok, conversation}
205 end
206 end
207
208 defp get_participations({:ok, conversation}) do
209 conversation
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
212 end
213
214 defp get_participations(_), do: []
215
216 def stream_out_participations(participations) do
217 participations =
218 participations
219 |> Repo.preload(:user)
220
221 Streamer.stream("participation", participations)
222 end
223
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
227
228 last_activity_id =
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
230 user: user,
231 blocking_user: user
232 })
233
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
236 end
237 end
238 end
239
240 def stream_out_participations(_, _), do: :noop
241
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
244 activity
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
247 end
248
249 def stream_out(_activity) do
250 :noop
251 end
252
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
256 result
257 end
258 end
259
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
266
267 create_data =
268 make_create_data(
269 %{to: to, actor: actor, published: published, context: context, object: object},
270 additional
271 )
272
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 else
282 {:quick_insert, true, activity} ->
283 {:ok, activity}
284
285 {:fake, true, activity} ->
286 {:ok, activity}
287
288 {:error, message} ->
289 Repo.rollback(message)
290 end
291 end
292
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
299
300 listen_data =
301 make_listen_data(
302 %{to: to, actor: actor, published: published, context: context, object: object},
303 additional
304 )
305
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
309 {:ok, activity}
310 end
311 end
312
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
318 result
319 end
320 end
321
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 nil -> nil
332 {:error, error} -> Repo.rollback(error)
333 end
334 end
335
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
337 def flag(
338 %{
339 actor: actor,
340 context: _context,
341 account: account,
342 statuses: statuses,
343 content: content
344 } = params
345 ) do
346 # only accept false as false value
347 local = !(params[:local] == false)
348 forward = !(params[:forward] == false)
349
350 additional = params[:additional] || %{}
351
352 additional =
353 if forward do
354 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
355 else
356 Map.merge(additional, %{"to" => [], "cc" => []})
357 end
358
359 with flag_data <- make_flag_data(params, additional),
360 {:ok, activity} <- insert(flag_data, local),
361 {:ok, stripped_activity} <- strip_report_status_data(activity),
362 _ <- notify_and_stream(activity),
363 :ok <- maybe_federate(stripped_activity) do
364 User.all_superusers()
365 |> Enum.filter(fn user -> not is_nil(user.email) end)
366 |> Enum.each(fn superuser ->
367 superuser
368 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
369 |> Pleroma.Emails.Mailer.deliver_async()
370 end)
371
372 {:ok, activity}
373 end
374 end
375
376 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
377 def move(%User{} = origin, %User{} = target, local \\ true) do
378 params = %{
379 "type" => "Move",
380 "actor" => origin.ap_id,
381 "object" => origin.ap_id,
382 "target" => target.ap_id
383 }
384
385 with true <- origin.ap_id in target.also_known_as,
386 {:ok, activity} <- insert(params, local),
387 _ <- notify_and_stream(activity) do
388 maybe_federate(activity)
389
390 BackgroundWorker.enqueue("move_following", %{
391 "origin_id" => origin.id,
392 "target_id" => target.id
393 })
394
395 {:ok, activity}
396 else
397 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
398 err -> err
399 end
400 end
401
402 def fetch_activities_for_context_query(context, opts) do
403 public = [Constants.as_public()]
404
405 recipients =
406 if opts[:user],
407 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
408 else: public
409
410 from(activity in Activity)
411 |> maybe_preload_objects(opts)
412 |> maybe_preload_bookmarks(opts)
413 |> maybe_set_thread_muted_field(opts)
414 |> restrict_blocked(opts)
415 |> restrict_recipients(recipients, opts[:user])
416 |> restrict_filtered(opts)
417 |> where(
418 [activity],
419 fragment(
420 "?->>'type' = ? and ?->>'context' = ?",
421 activity.data,
422 "Create",
423 activity.data,
424 ^context
425 )
426 )
427 |> exclude_poll_votes(opts)
428 |> exclude_id(opts)
429 |> order_by([activity], desc: activity.id)
430 end
431
432 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
433 def fetch_activities_for_context(context, opts \\ %{}) do
434 context
435 |> fetch_activities_for_context_query(opts)
436 |> Repo.all()
437 end
438
439 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
440 FlakeId.Ecto.CompatType.t() | nil
441 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
442 context
443 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
444 |> restrict_visibility(%{visibility: "direct"})
445 |> limit(1)
446 |> select([a], a.id)
447 |> Repo.one()
448 end
449
450 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
451 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
452 opts = Map.delete(opts, :user)
453
454 [Constants.as_public()]
455 |> fetch_activities_query(opts)
456 |> restrict_unlisted(opts)
457 |> Pagination.fetch_paginated(opts, pagination)
458 end
459
460 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
461 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
462 opts
463 |> Map.put(:restrict_unlisted, true)
464 |> fetch_public_or_unlisted_activities(pagination)
465 end
466
467 @valid_visibilities ~w[direct unlisted public private]
468
469 defp restrict_visibility(query, %{visibility: visibility})
470 when is_list(visibility) do
471 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
472 from(
473 a in query,
474 where:
475 fragment(
476 "activity_visibility(?, ?, ?) = ANY (?)",
477 a.actor,
478 a.recipients,
479 a.data,
480 ^visibility
481 )
482 )
483 else
484 Logger.error("Could not restrict visibility to #{visibility}")
485 end
486 end
487
488 defp restrict_visibility(query, %{visibility: visibility})
489 when visibility in @valid_visibilities do
490 from(
491 a in query,
492 where:
493 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
494 )
495 end
496
497 defp restrict_visibility(_query, %{visibility: visibility})
498 when visibility not in @valid_visibilities do
499 Logger.error("Could not restrict visibility to #{visibility}")
500 end
501
502 defp restrict_visibility(query, _visibility), do: query
503
504 defp exclude_visibility(query, %{exclude_visibilities: visibility})
505 when is_list(visibility) do
506 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
507 from(
508 a in query,
509 where:
510 not fragment(
511 "activity_visibility(?, ?, ?) = ANY (?)",
512 a.actor,
513 a.recipients,
514 a.data,
515 ^visibility
516 )
517 )
518 else
519 Logger.error("Could not exclude visibility to #{visibility}")
520 query
521 end
522 end
523
524 defp exclude_visibility(query, %{exclude_visibilities: visibility})
525 when visibility in @valid_visibilities do
526 from(
527 a in query,
528 where:
529 not fragment(
530 "activity_visibility(?, ?, ?) = ?",
531 a.actor,
532 a.recipients,
533 a.data,
534 ^visibility
535 )
536 )
537 end
538
539 defp exclude_visibility(query, %{exclude_visibilities: visibility})
540 when visibility not in [nil | @valid_visibilities] do
541 Logger.error("Could not exclude visibility to #{visibility}")
542 query
543 end
544
545 defp exclude_visibility(query, _visibility), do: query
546
547 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
548 do: query
549
550 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
551 do: query
552
553 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
554 from(
555 a in query,
556 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
557 )
558 end
559
560 defp restrict_thread_visibility(query, _, _), do: query
561
562 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
563 params =
564 params
565 |> Map.put(:user, reading_user)
566 |> Map.put(:actor_id, user.ap_id)
567
568 %{
569 godmode: params[:godmode],
570 reading_user: reading_user
571 }
572 |> user_activities_recipients()
573 |> fetch_activities(params)
574 |> Enum.reverse()
575 end
576
577 def fetch_user_activities(user, reading_user, params \\ %{}) do
578 params =
579 params
580 |> Map.put(:type, ["Create", "Announce"])
581 |> Map.put(:user, reading_user)
582 |> Map.put(:actor_id, user.ap_id)
583 |> Map.put(:pinned_activity_ids, user.pinned_activities)
584
585 params =
586 if User.blocks?(reading_user, user) do
587 params
588 else
589 params
590 |> Map.put(:blocking_user, reading_user)
591 |> Map.put(:muting_user, reading_user)
592 end
593
594 %{
595 godmode: params[:godmode],
596 reading_user: reading_user
597 }
598 |> user_activities_recipients()
599 |> fetch_activities(params)
600 |> Enum.reverse()
601 end
602
603 def fetch_statuses(reading_user, params) do
604 params = Map.put(params, :type, ["Create", "Announce"])
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params, :offset)
612 |> Enum.reverse()
613 end
614
615 defp user_activities_recipients(%{godmode: true}), do: []
616
617 defp user_activities_recipients(%{reading_user: reading_user}) do
618 if reading_user do
619 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
620 else
621 [Constants.as_public()]
622 end
623 end
624
625 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
626 raise "Can't use the child object without preloading!"
627 end
628
629 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
630 from(
631 [activity, object] in query,
632 where:
633 fragment(
634 "?->>'type' != ? or ?->>'actor' != ?",
635 activity.data,
636 "Announce",
637 object.data,
638 ^actor
639 )
640 )
641 end
642
643 defp restrict_announce_object_actor(query, _), do: query
644
645 defp restrict_since(query, %{since_id: ""}), do: query
646
647 defp restrict_since(query, %{since_id: since_id}) do
648 from(activity in query, where: activity.id > ^since_id)
649 end
650
651 defp restrict_since(query, _), do: query
652
653 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
654 raise "Can't use the child object without preloading!"
655 end
656
657 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
658 from(
659 [_activity, object] in query,
660 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
661 )
662 end
663
664 defp restrict_tag_reject(query, _), do: query
665
666 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
667 raise "Can't use the child object without preloading!"
668 end
669
670 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
671 from(
672 [_activity, object] in query,
673 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
674 )
675 end
676
677 defp restrict_tag_all(query, _), do: query
678
679 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
680 raise "Can't use the child object without preloading!"
681 end
682
683 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
684 from(
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
687 )
688 end
689
690 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
691 from(
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
694 )
695 end
696
697 defp restrict_tag(query, _), do: query
698
699 defp restrict_recipients(query, [], _user), do: query
700
701 defp restrict_recipients(query, recipients, nil) do
702 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
703 end
704
705 defp restrict_recipients(query, recipients, user) do
706 from(
707 activity in query,
708 where: fragment("? && ?", ^recipients, activity.recipients),
709 or_where: activity.actor == ^user.ap_id
710 )
711 end
712
713 defp restrict_local(query, %{local_only: true}) do
714 from(activity in query, where: activity.local == true)
715 end
716
717 defp restrict_local(query, _), do: query
718
719 defp restrict_actor(query, %{actor_id: actor_id}) do
720 from(activity in query, where: activity.actor == ^actor_id)
721 end
722
723 defp restrict_actor(query, _), do: query
724
725 defp restrict_type(query, %{type: type}) when is_binary(type) do
726 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
727 end
728
729 defp restrict_type(query, %{type: type}) do
730 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
731 end
732
733 defp restrict_type(query, _), do: query
734
735 defp restrict_state(query, %{state: state}) do
736 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
737 end
738
739 defp restrict_state(query, _), do: query
740
741 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
742 from(
743 [_activity, object] in query,
744 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
745 )
746 end
747
748 defp restrict_favorited_by(query, _), do: query
749
750 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
751 raise "Can't use the child object without preloading!"
752 end
753
754 defp restrict_media(query, %{only_media: true}) do
755 from(
756 [activity, object] in query,
757 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
758 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
759 )
760 end
761
762 defp restrict_media(query, _), do: query
763
764 defp restrict_replies(query, %{exclude_replies: true}) do
765 from(
766 [_activity, object] in query,
767 where: fragment("?->>'inReplyTo' is null", object.data)
768 )
769 end
770
771 defp restrict_replies(query, %{
772 reply_filtering_user: %User{} = user,
773 reply_visibility: "self"
774 }) do
775 from(
776 [activity, object] in query,
777 where:
778 fragment(
779 "?->>'inReplyTo' is null OR ? = ANY(?)",
780 object.data,
781 ^user.ap_id,
782 activity.recipients
783 )
784 )
785 end
786
787 defp restrict_replies(query, %{
788 reply_filtering_user: %User{} = user,
789 reply_visibility: "following"
790 }) do
791 from(
792 [activity, object] in query,
793 where:
794 fragment(
795 """
796 ?->>'type' != 'Create' -- This isn't a Create
797 OR ?->>'inReplyTo' is null -- this isn't a reply
798 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
799 -- unless they are the author (because authors
800 -- are also part of the recipients). This leads
801 -- to a bug that self-replies by friends won't
802 -- show up.
803 OR ? = ? -- The actor is us
804 """,
805 activity.data,
806 object.data,
807 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
808 activity.recipients,
809 activity.actor,
810 activity.actor,
811 ^user.ap_id
812 )
813 )
814 end
815
816 defp restrict_replies(query, _), do: query
817
818 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
819 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
820 end
821
822 defp restrict_reblogs(query, _), do: query
823
824 defp restrict_muted(query, %{with_muted: true}), do: query
825
826 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
827 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
828
829 query =
830 from([activity] in query,
831 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
832 where:
833 fragment(
834 "not (?->'to' \\?| ?) or ? = ?",
835 activity.data,
836 ^mutes,
837 activity.actor,
838 ^user.ap_id
839 )
840 )
841
842 unless opts[:skip_preload] do
843 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
844 else
845 query
846 end
847 end
848
849 defp restrict_muted(query, _), do: query
850
851 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
852 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
853 domain_blocks = user.domain_blocks || []
854
855 following_ap_ids = User.get_friends_ap_ids(user)
856
857 query =
858 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
859
860 from(
861 [activity, object: o] in query,
862 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
863 where:
864 fragment(
865 "((not (? && ?)) or ? = ?)",
866 activity.recipients,
867 ^blocked_ap_ids,
868 activity.actor,
869 ^user.ap_id
870 ),
871 where:
872 fragment(
873 "recipients_contain_blocked_domains(?, ?) = false",
874 activity.recipients,
875 ^domain_blocks
876 ),
877 where:
878 fragment(
879 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
880 activity.data,
881 activity.data,
882 ^blocked_ap_ids
883 ),
884 where:
885 fragment(
886 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
887 activity.actor,
888 ^domain_blocks,
889 activity.actor,
890 ^following_ap_ids
891 ),
892 where:
893 fragment(
894 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
895 o.data,
896 ^domain_blocks,
897 o.data,
898 ^following_ap_ids
899 )
900 )
901 end
902
903 defp restrict_blocked(query, _), do: query
904
905 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
906 from(
907 activity in query,
908 where:
909 fragment(
910 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
911 activity.data,
912 ^[Constants.as_public()]
913 )
914 )
915 end
916
917 defp restrict_unlisted(query, _), do: query
918
919 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
920 from(activity in query, where: activity.id in ^ids)
921 end
922
923 defp restrict_pinned(query, _), do: query
924
925 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
926 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
927
928 from(
929 activity in query,
930 where:
931 fragment(
932 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
933 activity.data,
934 activity.actor,
935 ^muted_reblogs
936 )
937 )
938 end
939
940 defp restrict_muted_reblogs(query, _), do: query
941
942 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
943 from(
944 activity in query,
945 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
946 )
947 end
948
949 defp restrict_instance(query, _), do: query
950
951 defp restrict_filtered(query, %{user: %User{} = user}) do
952 case Filter.compose_regex(user) do
953 nil ->
954 query
955
956 regex ->
957 from([activity, object] in query,
958 where:
959 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
960 activity.actor == ^user.ap_id
961 )
962 end
963 end
964
965 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
966 restrict_filtered(query, %{user: user})
967 end
968
969 defp restrict_filtered(query, _), do: query
970
971 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
972
973 defp exclude_poll_votes(query, _) do
974 if has_named_binding?(query, :object) do
975 from([activity, object: o] in query,
976 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
977 )
978 else
979 query
980 end
981 end
982
983 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
984
985 defp exclude_chat_messages(query, _) do
986 if has_named_binding?(query, :object) do
987 from([activity, object: o] in query,
988 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
989 )
990 else
991 query
992 end
993 end
994
995 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
996
997 defp exclude_invisible_actors(query, _opts) do
998 invisible_ap_ids =
999 User.Query.build(%{invisible: true, select: [:ap_id]})
1000 |> Repo.all()
1001 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1002
1003 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1004 end
1005
1006 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1007 from(activity in query, where: activity.id != ^id)
1008 end
1009
1010 defp exclude_id(query, _), do: query
1011
1012 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1013
1014 defp maybe_preload_objects(query, _) do
1015 query
1016 |> Activity.with_preloaded_object()
1017 end
1018
1019 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1020
1021 defp maybe_preload_bookmarks(query, opts) do
1022 query
1023 |> Activity.with_preloaded_bookmark(opts[:user])
1024 end
1025
1026 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1027 query
1028 |> Activity.with_preloaded_report_notes()
1029 end
1030
1031 defp maybe_preload_report_notes(query, _), do: query
1032
1033 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1034
1035 defp maybe_set_thread_muted_field(query, opts) do
1036 query
1037 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1038 end
1039
1040 defp maybe_order(query, %{order: :desc}) do
1041 query
1042 |> order_by(desc: :id)
1043 end
1044
1045 defp maybe_order(query, %{order: :asc}) do
1046 query
1047 |> order_by(asc: :id)
1048 end
1049
1050 defp maybe_order(query, _), do: query
1051
1052 defp fetch_activities_query_ap_ids_ops(opts) do
1053 source_user = opts[:muting_user]
1054 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1055
1056 ap_id_relationships =
1057 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1058 [:block | ap_id_relationships]
1059 else
1060 ap_id_relationships
1061 end
1062
1063 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1064
1065 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1066 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1067
1068 restrict_muted_reblogs_opts =
1069 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1070
1071 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1072 end
1073
1074 def fetch_activities_query(recipients, opts \\ %{}) do
1075 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1076 fetch_activities_query_ap_ids_ops(opts)
1077
1078 config = %{
1079 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1080 }
1081
1082 Activity
1083 |> maybe_preload_objects(opts)
1084 |> maybe_preload_bookmarks(opts)
1085 |> maybe_preload_report_notes(opts)
1086 |> maybe_set_thread_muted_field(opts)
1087 |> maybe_order(opts)
1088 |> restrict_recipients(recipients, opts[:user])
1089 |> restrict_replies(opts)
1090 |> restrict_tag(opts)
1091 |> restrict_tag_reject(opts)
1092 |> restrict_tag_all(opts)
1093 |> restrict_since(opts)
1094 |> restrict_local(opts)
1095 |> restrict_actor(opts)
1096 |> restrict_type(opts)
1097 |> restrict_state(opts)
1098 |> restrict_favorited_by(opts)
1099 |> restrict_blocked(restrict_blocked_opts)
1100 |> restrict_muted(restrict_muted_opts)
1101 |> restrict_filtered(opts)
1102 |> restrict_media(opts)
1103 |> restrict_visibility(opts)
1104 |> restrict_thread_visibility(opts, config)
1105 |> restrict_reblogs(opts)
1106 |> restrict_pinned(opts)
1107 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1108 |> restrict_instance(opts)
1109 |> restrict_announce_object_actor(opts)
1110 |> restrict_filtered(opts)
1111 |> Activity.restrict_deactivated_users()
1112 |> exclude_poll_votes(opts)
1113 |> exclude_chat_messages(opts)
1114 |> exclude_invisible_actors(opts)
1115 |> exclude_visibility(opts)
1116 end
1117
1118 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1119 list_memberships = Pleroma.List.memberships(opts[:user])
1120
1121 fetch_activities_query(recipients ++ list_memberships, opts)
1122 |> Pagination.fetch_paginated(opts, pagination)
1123 |> Enum.reverse()
1124 |> maybe_update_cc(list_memberships, opts[:user])
1125 end
1126
1127 @doc """
1128 Fetch favorites activities of user with order by sort adds to favorites
1129 """
1130 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1131 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1132 user.ap_id
1133 |> Activity.Queries.by_actor()
1134 |> Activity.Queries.by_type("Like")
1135 |> Activity.with_joined_object()
1136 |> Object.with_joined_activity()
1137 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1138 |> order_by([like, _, _], desc_nulls_last: like.id)
1139 |> Pagination.fetch_paginated(
1140 Map.merge(params, %{skip_order: true}),
1141 pagination
1142 )
1143 end
1144
1145 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1146 Enum.map(activities, fn
1147 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1148 if Enum.any?(bcc, &(&1 in list_memberships)) do
1149 update_in(activity.data["cc"], &[user_ap_id | &1])
1150 else
1151 activity
1152 end
1153
1154 activity ->
1155 activity
1156 end)
1157 end
1158
1159 defp maybe_update_cc(activities, _, _), do: activities
1160
1161 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1162 from(activity in query,
1163 where:
1164 fragment("? && ?", activity.recipients, ^recipients) or
1165 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1166 ^Constants.as_public() in activity.recipients)
1167 )
1168 end
1169
1170 def fetch_activities_bounded(
1171 recipients,
1172 recipients_with_public,
1173 opts \\ %{},
1174 pagination \\ :keyset
1175 ) do
1176 fetch_activities_query([], opts)
1177 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1178 |> Pagination.fetch_paginated(opts, pagination)
1179 |> Enum.reverse()
1180 end
1181
1182 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1183 def upload(file, opts \\ []) do
1184 with {:ok, data} <- Upload.store(file, opts) do
1185 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1186
1187 Repo.insert(%Object{data: obj_data})
1188 end
1189 end
1190
1191 @spec get_actor_url(any()) :: binary() | nil
1192 defp get_actor_url(url) when is_binary(url), do: url
1193 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1194
1195 defp get_actor_url(url) when is_list(url) do
1196 url
1197 |> List.first()
1198 |> get_actor_url()
1199 end
1200
1201 defp get_actor_url(_url), do: nil
1202
1203 defp object_to_user_data(data) do
1204 avatar =
1205 data["icon"]["url"] &&
1206 %{
1207 "type" => "Image",
1208 "url" => [%{"href" => data["icon"]["url"]}]
1209 }
1210
1211 banner =
1212 data["image"]["url"] &&
1213 %{
1214 "type" => "Image",
1215 "url" => [%{"href" => data["image"]["url"]}]
1216 }
1217
1218 fields =
1219 data
1220 |> Map.get("attachment", [])
1221 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1222 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1223
1224 emojis =
1225 data
1226 |> Map.get("tag", [])
1227 |> Enum.filter(fn
1228 %{"type" => "Emoji"} -> true
1229 _ -> false
1230 end)
1231 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1232 {String.trim(name, ":"), url}
1233 end)
1234
1235 is_locked = data["manuallyApprovesFollowers"] || false
1236 capabilities = data["capabilities"] || %{}
1237 accepts_chat_messages = capabilities["acceptsChatMessages"]
1238 data = Transmogrifier.maybe_fix_user_object(data)
1239 is_discoverable = data["discoverable"] || false
1240 invisible = data["invisible"] || false
1241 actor_type = data["type"] || "Person"
1242
1243 public_key =
1244 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1245 data["publicKey"]["publicKeyPem"]
1246 else
1247 nil
1248 end
1249
1250 shared_inbox =
1251 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1252 data["endpoints"]["sharedInbox"]
1253 else
1254 nil
1255 end
1256
1257 user_data = %{
1258 ap_id: data["id"],
1259 uri: get_actor_url(data["url"]),
1260 ap_enabled: true,
1261 banner: banner,
1262 fields: fields,
1263 emoji: emojis,
1264 is_locked: is_locked,
1265 is_discoverable: is_discoverable,
1266 invisible: invisible,
1267 avatar: avatar,
1268 name: data["name"],
1269 follower_address: data["followers"],
1270 following_address: data["following"],
1271 bio: data["summary"] || "",
1272 actor_type: actor_type,
1273 also_known_as: Map.get(data, "alsoKnownAs", []),
1274 public_key: public_key,
1275 inbox: data["inbox"],
1276 shared_inbox: shared_inbox,
1277 accepts_chat_messages: accepts_chat_messages
1278 }
1279
1280 # nickname can be nil because of virtual actors
1281 if data["preferredUsername"] do
1282 Map.put(
1283 user_data,
1284 :nickname,
1285 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1286 )
1287 else
1288 Map.put(user_data, :nickname, nil)
1289 end
1290 end
1291
1292 def fetch_follow_information_for_user(user) do
1293 with {:ok, following_data} <-
1294 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1295 force_http: true
1296 ),
1297 {:ok, hide_follows} <- collection_private(following_data),
1298 {:ok, followers_data} <-
1299 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1300 {:ok, hide_followers} <- collection_private(followers_data) do
1301 {:ok,
1302 %{
1303 hide_follows: hide_follows,
1304 follower_count: normalize_counter(followers_data["totalItems"]),
1305 following_count: normalize_counter(following_data["totalItems"]),
1306 hide_followers: hide_followers
1307 }}
1308 else
1309 {:error, _} = e -> e
1310 e -> {:error, e}
1311 end
1312 end
1313
1314 defp normalize_counter(counter) when is_integer(counter), do: counter
1315 defp normalize_counter(_), do: 0
1316
1317 def maybe_update_follow_information(user_data) do
1318 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1319 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1320 {_, true} <-
1321 {:collections_available,
1322 !!(user_data[:following_address] && user_data[:follower_address])},
1323 {:ok, info} <-
1324 fetch_follow_information_for_user(user_data) do
1325 info = Map.merge(user_data[:info] || %{}, info)
1326
1327 user_data
1328 |> Map.put(:info, info)
1329 else
1330 {:user_type_check, false} ->
1331 user_data
1332
1333 {:collections_available, false} ->
1334 user_data
1335
1336 {:enabled, false} ->
1337 user_data
1338
1339 e ->
1340 Logger.error(
1341 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1342 )
1343
1344 user_data
1345 end
1346 end
1347
1348 defp collection_private(%{"first" => %{"type" => type}})
1349 when type in ["CollectionPage", "OrderedCollectionPage"],
1350 do: {:ok, false}
1351
1352 defp collection_private(%{"first" => first}) do
1353 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1354 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1355 {:ok, false}
1356 else
1357 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1358 {:error, _} = e -> e
1359 e -> {:error, e}
1360 end
1361 end
1362
1363 defp collection_private(_data), do: {:ok, true}
1364
1365 def user_data_from_user_object(data) do
1366 with {:ok, data} <- MRF.filter(data) do
1367 {:ok, object_to_user_data(data)}
1368 else
1369 e -> {:error, e}
1370 end
1371 end
1372
1373 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1374 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1375 {:ok, data} <- user_data_from_user_object(data) do
1376 {:ok, maybe_update_follow_information(data)}
1377 else
1378 # If this has been deleted, only log a debug and not an error
1379 {:error, "Object has been deleted" = e} ->
1380 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1381 {:error, e}
1382
1383 {:error, {:reject, reason} = e} ->
1384 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1385 {:error, e}
1386
1387 {:error, e} ->
1388 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1389 {:error, e}
1390 end
1391 end
1392
1393 def maybe_handle_clashing_nickname(data) do
1394 with nickname when is_binary(nickname) <- data[:nickname],
1395 %User{} = old_user <- User.get_by_nickname(nickname),
1396 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1397 Logger.info(
1398 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1399 data[:ap_id]
1400 }, renaming."
1401 )
1402
1403 old_user
1404 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1405 |> User.update_and_set_cache()
1406 else
1407 {:ap_id_comparison, true} ->
1408 Logger.info(
1409 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1410 )
1411
1412 _ ->
1413 nil
1414 end
1415 end
1416
1417 def make_user_from_ap_id(ap_id, opts \\ []) do
1418 user = User.get_cached_by_ap_id(ap_id)
1419
1420 if user && !User.ap_enabled?(user) do
1421 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1422 else
1423 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1424 if user do
1425 user
1426 |> User.remote_user_changeset(data)
1427 |> User.update_and_set_cache()
1428 else
1429 maybe_handle_clashing_nickname(data)
1430
1431 data
1432 |> User.remote_user_changeset()
1433 |> Repo.insert()
1434 |> User.set_cache()
1435 end
1436 end
1437 end
1438 end
1439
1440 def make_user_from_nickname(nickname) do
1441 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1442 make_user_from_ap_id(ap_id)
1443 else
1444 _e -> {:error, "No AP id in WebFinger"}
1445 end
1446 end
1447
1448 # filter out broken threads
1449 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1450 entire_thread_visible_for_user?(activity, user)
1451 end
1452
1453 # do post-processing on a specific activity
1454 def contain_activity(%Activity{} = activity, %User{} = user) do
1455 contain_broken_threads(activity, user)
1456 end
1457
1458 def fetch_direct_messages_query do
1459 Activity
1460 |> restrict_type(%{type: "Create"})
1461 |> restrict_visibility(%{visibility: "direct"})
1462 |> order_by([activity], asc: activity.id)
1463 end
1464 end