86428b8616f400e8f08b06431a9ed6974e0d90e9
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
89 "type" => "Create",
90 "actor" => actor
91 }) do
92 Object.increase_vote_count(reply_ap_id, name, actor)
93 end
94
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
96
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
101 {:ok, object, meta}
102 end
103 end
104
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
108 {:ok, activity} <-
109 Repo.insert(%Activity{
110 data: object,
111 local: local,
112 recipients: recipients,
113 actor: object["actor"]
114 }) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 %Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 }
137 |> Repo.insert()
138 |> maybe_create_activity_expiration()
139
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
142
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
144
145 {:ok, activity}
146 else
147 %Activity{} = activity ->
148 {:ok, activity}
149
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
152 data: map,
153 local: local,
154 actor: map["actor"],
155 recipients: recipients,
156 id: "pleroma:fakeid"
157 }
158
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:ok, activity}
161
162 error ->
163 {:error, error}
164 end
165 end
166
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
169
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
172 stream_out(activity)
173 stream_out_participations(participations)
174 end
175
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
178 {:ok, activity}
179 end
180 end
181
182 defp maybe_create_activity_expiration(result), do: result
183
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
188 {:ok, conversation}
189 end
190 end
191
192 defp get_participations({:ok, conversation}) do
193 conversation
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
196 end
197
198 defp get_participations(_), do: []
199
200 def stream_out_participations(participations) do
201 participations =
202 participations
203 |> Repo.preload(:user)
204
205 Streamer.stream("participation", participations)
206 end
207
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
211
212 last_activity_id =
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
214 user: user,
215 blocking_user: user
216 })
217
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
220 end
221 end
222 end
223
224 def stream_out_participations(_, _), do: :noop
225
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
228 activity
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
231 end
232
233 def stream_out(_activity) do
234 :noop
235 end
236
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
240 result
241 end
242 end
243
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
250
251 create_data =
252 make_create_data(
253 %{to: to, actor: actor, published: published, context: context, object: object},
254 additional
255 )
256
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity}
266 else
267 {:quick_insert, true, activity} ->
268 {:ok, activity}
269
270 {:fake, true, activity} ->
271 {:ok, activity}
272
273 {:error, message} ->
274 Repo.rollback(message)
275 end
276 end
277
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
284
285 listen_data =
286 make_listen_data(
287 %{to: to, actor: actor, published: published, context: context, object: object},
288 additional
289 )
290
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
301 end
302
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
306 end
307
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
312
313 data =
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
316
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
329 result
330 end
331 end
332
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
336
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
340 {:ok, activity}
341 else
342 {:error, error} -> Repo.rollback(error)
343 end
344 end
345
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
351 result
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 nil -> nil
365 {:error, error} -> Repo.rollback(error)
366 end
367 end
368
369 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
370 def flag(
371 %{
372 actor: actor,
373 context: _context,
374 account: account,
375 statuses: statuses,
376 content: content
377 } = params
378 ) do
379 # only accept false as false value
380 local = !(params[:local] == false)
381 forward = !(params[:forward] == false)
382
383 additional = params[:additional] || %{}
384
385 additional =
386 if forward do
387 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
388 else
389 Map.merge(additional, %{"to" => [], "cc" => []})
390 end
391
392 with flag_data <- make_flag_data(params, additional),
393 {:ok, activity} <- insert(flag_data, local),
394 {:ok, stripped_activity} <- strip_report_status_data(activity),
395 _ <- notify_and_stream(activity),
396 :ok <- maybe_federate(stripped_activity) do
397 User.all_superusers()
398 |> Enum.filter(fn user -> not is_nil(user.email) end)
399 |> Enum.each(fn superuser ->
400 superuser
401 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
402 |> Pleroma.Emails.Mailer.deliver_async()
403 end)
404
405 {:ok, activity}
406 end
407 end
408
409 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
410 def move(%User{} = origin, %User{} = target, local \\ true) do
411 params = %{
412 "type" => "Move",
413 "actor" => origin.ap_id,
414 "object" => origin.ap_id,
415 "target" => target.ap_id
416 }
417
418 with true <- origin.ap_id in target.also_known_as,
419 {:ok, activity} <- insert(params, local),
420 _ <- notify_and_stream(activity) do
421 maybe_federate(activity)
422
423 BackgroundWorker.enqueue("move_following", %{
424 "origin_id" => origin.id,
425 "target_id" => target.id
426 })
427
428 {:ok, activity}
429 else
430 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
431 err -> err
432 end
433 end
434
435 def fetch_activities_for_context_query(context, opts) do
436 public = [Constants.as_public()]
437
438 recipients =
439 if opts[:user],
440 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
441 else: public
442
443 from(activity in Activity)
444 |> maybe_preload_objects(opts)
445 |> maybe_preload_bookmarks(opts)
446 |> maybe_set_thread_muted_field(opts)
447 |> restrict_blocked(opts)
448 |> restrict_recipients(recipients, opts[:user])
449 |> where(
450 [activity],
451 fragment(
452 "?->>'type' = ? and ?->>'context' = ?",
453 activity.data,
454 "Create",
455 activity.data,
456 ^context
457 )
458 )
459 |> exclude_poll_votes(opts)
460 |> exclude_id(opts)
461 |> order_by([activity], desc: activity.id)
462 end
463
464 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
465 def fetch_activities_for_context(context, opts \\ %{}) do
466 context
467 |> fetch_activities_for_context_query(opts)
468 |> Repo.all()
469 end
470
471 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
472 FlakeId.Ecto.CompatType.t() | nil
473 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
474 context
475 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
476 |> restrict_visibility(%{visibility: "direct"})
477 |> limit(1)
478 |> select([a], a.id)
479 |> Repo.one()
480 end
481
482 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
483 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
484 opts = Map.delete(opts, :user)
485
486 [Constants.as_public()]
487 |> fetch_activities_query(opts)
488 |> restrict_unlisted(opts)
489 |> Pagination.fetch_paginated(opts, pagination)
490 end
491
492 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
493 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
494 opts
495 |> Map.put(:restrict_unlisted, true)
496 |> fetch_public_or_unlisted_activities(pagination)
497 end
498
499 @valid_visibilities ~w[direct unlisted public private]
500
501 defp restrict_visibility(query, %{visibility: visibility})
502 when is_list(visibility) do
503 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
504 from(
505 a in query,
506 where:
507 fragment(
508 "activity_visibility(?, ?, ?) = ANY (?)",
509 a.actor,
510 a.recipients,
511 a.data,
512 ^visibility
513 )
514 )
515 else
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518 end
519
520 defp restrict_visibility(query, %{visibility: visibility})
521 when visibility in @valid_visibilities do
522 from(
523 a in query,
524 where:
525 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
526 )
527 end
528
529 defp restrict_visibility(_query, %{visibility: visibility})
530 when visibility not in @valid_visibilities do
531 Logger.error("Could not restrict visibility to #{visibility}")
532 end
533
534 defp restrict_visibility(query, _visibility), do: query
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when is_list(visibility) do
538 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
539 from(
540 a in query,
541 where:
542 not fragment(
543 "activity_visibility(?, ?, ?) = ANY (?)",
544 a.actor,
545 a.recipients,
546 a.data,
547 ^visibility
548 )
549 )
550 else
551 Logger.error("Could not exclude visibility to #{visibility}")
552 query
553 end
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility in @valid_visibilities do
558 from(
559 a in query,
560 where:
561 not fragment(
562 "activity_visibility(?, ?, ?) = ?",
563 a.actor,
564 a.recipients,
565 a.data,
566 ^visibility
567 )
568 )
569 end
570
571 defp exclude_visibility(query, %{exclude_visibilities: visibility})
572 when visibility not in [nil | @valid_visibilities] do
573 Logger.error("Could not exclude visibility to #{visibility}")
574 query
575 end
576
577 defp exclude_visibility(query, _visibility), do: query
578
579 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
580 do: query
581
582 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
583 do: query
584
585 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
586 from(
587 a in query,
588 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
589 )
590 end
591
592 defp restrict_thread_visibility(query, _, _), do: query
593
594 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:user, reading_user)
598 |> Map.put(:actor_id, user.ap_id)
599
600 %{
601 godmode: params[:godmode],
602 reading_user: reading_user
603 }
604 |> user_activities_recipients()
605 |> fetch_activities(params)
606 |> Enum.reverse()
607 end
608
609 def fetch_user_activities(user, reading_user, params \\ %{}) do
610 params =
611 params
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
616
617 params =
618 if User.blocks?(reading_user, user) do
619 params
620 else
621 params
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
624 end
625
626 %{
627 godmode: params[:godmode],
628 reading_user: reading_user
629 }
630 |> user_activities_recipients()
631 |> fetch_activities(params)
632 |> Enum.reverse()
633 end
634
635 def fetch_statuses(reading_user, params) do
636 params = Map.put(params, :type, ["Create", "Announce"])
637
638 %{
639 godmode: params[:godmode],
640 reading_user: reading_user
641 }
642 |> user_activities_recipients()
643 |> fetch_activities(params, :offset)
644 |> Enum.reverse()
645 end
646
647 defp user_activities_recipients(%{godmode: true}), do: []
648
649 defp user_activities_recipients(%{reading_user: reading_user}) do
650 if reading_user do
651 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
652 else
653 [Constants.as_public()]
654 end
655 end
656
657 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
658 raise "Can't use the child object without preloading!"
659 end
660
661 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
662 from(
663 [activity, object] in query,
664 where:
665 fragment(
666 "?->>'type' != ? or ?->>'actor' != ?",
667 activity.data,
668 "Announce",
669 object.data,
670 ^actor
671 )
672 )
673 end
674
675 defp restrict_announce_object_actor(query, _), do: query
676
677 defp restrict_since(query, %{since_id: ""}), do: query
678
679 defp restrict_since(query, %{since_id: since_id}) do
680 from(activity in query, where: activity.id > ^since_id)
681 end
682
683 defp restrict_since(query, _), do: query
684
685 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
686 raise "Can't use the child object without preloading!"
687 end
688
689 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
690 from(
691 [_activity, object] in query,
692 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
693 )
694 end
695
696 defp restrict_tag_reject(query, _), do: query
697
698 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
699 raise "Can't use the child object without preloading!"
700 end
701
702 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
706 )
707 end
708
709 defp restrict_tag_all(query, _), do: query
710
711 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
712 raise "Can't use the child object without preloading!"
713 end
714
715 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
716 from(
717 [_activity, object] in query,
718 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
719 )
720 end
721
722 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
723 from(
724 [_activity, object] in query,
725 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
726 )
727 end
728
729 defp restrict_tag(query, _), do: query
730
731 defp restrict_recipients(query, [], _user), do: query
732
733 defp restrict_recipients(query, recipients, nil) do
734 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
735 end
736
737 defp restrict_recipients(query, recipients, user) do
738 from(
739 activity in query,
740 where: fragment("? && ?", ^recipients, activity.recipients),
741 or_where: activity.actor == ^user.ap_id
742 )
743 end
744
745 defp restrict_local(query, %{local_only: true}) do
746 from(activity in query, where: activity.local == true)
747 end
748
749 defp restrict_local(query, _), do: query
750
751 defp restrict_actor(query, %{actor_id: actor_id}) do
752 from(activity in query, where: activity.actor == ^actor_id)
753 end
754
755 defp restrict_actor(query, _), do: query
756
757 defp restrict_type(query, %{type: type}) when is_binary(type) do
758 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
759 end
760
761 defp restrict_type(query, %{type: type}) do
762 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
763 end
764
765 defp restrict_type(query, _), do: query
766
767 defp restrict_state(query, %{state: state}) do
768 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
769 end
770
771 defp restrict_state(query, _), do: query
772
773 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
774 from(
775 [_activity, object] in query,
776 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
777 )
778 end
779
780 defp restrict_favorited_by(query, _), do: query
781
782 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
783 raise "Can't use the child object without preloading!"
784 end
785
786 defp restrict_media(query, %{only_media: true}) do
787 from(
788 [activity, object] in query,
789 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
790 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
791 )
792 end
793
794 defp restrict_media(query, _), do: query
795
796 defp restrict_replies(query, %{exclude_replies: true}) do
797 from(
798 [_activity, object] in query,
799 where: fragment("?->>'inReplyTo' is null", object.data)
800 )
801 end
802
803 defp restrict_replies(query, %{
804 reply_filtering_user: user,
805 reply_visibility: "self"
806 }) do
807 from(
808 [activity, object] in query,
809 where:
810 fragment(
811 "?->>'inReplyTo' is null OR ? = ANY(?)",
812 object.data,
813 ^user.ap_id,
814 activity.recipients
815 )
816 )
817 end
818
819 defp restrict_replies(query, %{
820 reply_filtering_user: user,
821 reply_visibility: "following"
822 }) do
823 from(
824 [activity, object] in query,
825 where:
826 fragment(
827 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
828 object.data,
829 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
830 activity.recipients,
831 activity.actor,
832 activity.actor,
833 ^user.ap_id
834 )
835 )
836 end
837
838 defp restrict_replies(query, _), do: query
839
840 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
841 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
842 end
843
844 defp restrict_reblogs(query, _), do: query
845
846 defp restrict_muted(query, %{with_muted: true}), do: query
847
848 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
849 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
850
851 query =
852 from([activity] in query,
853 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
854 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
855 )
856
857 unless opts[:skip_preload] do
858 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
859 else
860 query
861 end
862 end
863
864 defp restrict_muted(query, _), do: query
865
866 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
867 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
868 domain_blocks = user.domain_blocks || []
869
870 following_ap_ids = User.get_friends_ap_ids(user)
871
872 query =
873 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
874
875 from(
876 [activity, object: o] in query,
877 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
878 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
879 where:
880 fragment(
881 "recipients_contain_blocked_domains(?, ?) = false",
882 activity.recipients,
883 ^domain_blocks
884 ),
885 where:
886 fragment(
887 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
888 activity.data,
889 activity.data,
890 ^blocked_ap_ids
891 ),
892 where:
893 fragment(
894 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
895 activity.actor,
896 ^domain_blocks,
897 activity.actor,
898 ^following_ap_ids
899 ),
900 where:
901 fragment(
902 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
903 o.data,
904 ^domain_blocks,
905 o.data,
906 ^following_ap_ids
907 )
908 )
909 end
910
911 defp restrict_blocked(query, _), do: query
912
913 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
914 from(
915 activity in query,
916 where:
917 fragment(
918 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
919 activity.data,
920 ^[Constants.as_public()]
921 )
922 )
923 end
924
925 defp restrict_unlisted(query, _), do: query
926
927 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
928 from(activity in query, where: activity.id in ^ids)
929 end
930
931 defp restrict_pinned(query, _), do: query
932
933 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
934 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
935
936 from(
937 activity in query,
938 where:
939 fragment(
940 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
941 activity.data,
942 activity.actor,
943 ^muted_reblogs
944 )
945 )
946 end
947
948 defp restrict_muted_reblogs(query, _), do: query
949
950 defp restrict_instance(query, %{instance: instance}) do
951 users =
952 from(
953 u in User,
954 select: u.ap_id,
955 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
956 )
957 |> Repo.all()
958
959 from(activity in query, where: activity.actor in ^users)
960 end
961
962 defp restrict_instance(query, _), do: query
963
964 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
965
966 defp exclude_poll_votes(query, _) do
967 if has_named_binding?(query, :object) do
968 from([activity, object: o] in query,
969 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
970 )
971 else
972 query
973 end
974 end
975
976 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
977
978 defp exclude_chat_messages(query, _) do
979 if has_named_binding?(query, :object) do
980 from([activity, object: o] in query,
981 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
982 )
983 else
984 query
985 end
986 end
987
988 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
989
990 defp exclude_invisible_actors(query, _opts) do
991 invisible_ap_ids =
992 User.Query.build(%{invisible: true, select: [:ap_id]})
993 |> Repo.all()
994 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
995
996 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
997 end
998
999 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1000 from(activity in query, where: activity.id != ^id)
1001 end
1002
1003 defp exclude_id(query, _), do: query
1004
1005 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1006
1007 defp maybe_preload_objects(query, _) do
1008 query
1009 |> Activity.with_preloaded_object()
1010 end
1011
1012 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1013
1014 defp maybe_preload_bookmarks(query, opts) do
1015 query
1016 |> Activity.with_preloaded_bookmark(opts[:user])
1017 end
1018
1019 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1020 query
1021 |> Activity.with_preloaded_report_notes()
1022 end
1023
1024 defp maybe_preload_report_notes(query, _), do: query
1025
1026 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1027
1028 defp maybe_set_thread_muted_field(query, opts) do
1029 query
1030 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1031 end
1032
1033 defp maybe_order(query, %{order: :desc}) do
1034 query
1035 |> order_by(desc: :id)
1036 end
1037
1038 defp maybe_order(query, %{order: :asc}) do
1039 query
1040 |> order_by(asc: :id)
1041 end
1042
1043 defp maybe_order(query, _), do: query
1044
1045 defp fetch_activities_query_ap_ids_ops(opts) do
1046 source_user = opts[:muting_user]
1047 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1048
1049 ap_id_relationships =
1050 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1051 [:block | ap_id_relationships]
1052 else
1053 ap_id_relationships
1054 end
1055
1056 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1057
1058 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1059 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1060
1061 restrict_muted_reblogs_opts =
1062 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1063
1064 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1065 end
1066
1067 def fetch_activities_query(recipients, opts \\ %{}) do
1068 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1069 fetch_activities_query_ap_ids_ops(opts)
1070
1071 config = %{
1072 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1073 }
1074
1075 Activity
1076 |> maybe_preload_objects(opts)
1077 |> maybe_preload_bookmarks(opts)
1078 |> maybe_preload_report_notes(opts)
1079 |> maybe_set_thread_muted_field(opts)
1080 |> maybe_order(opts)
1081 |> restrict_recipients(recipients, opts[:user])
1082 |> restrict_replies(opts)
1083 |> restrict_tag(opts)
1084 |> restrict_tag_reject(opts)
1085 |> restrict_tag_all(opts)
1086 |> restrict_since(opts)
1087 |> restrict_local(opts)
1088 |> restrict_actor(opts)
1089 |> restrict_type(opts)
1090 |> restrict_state(opts)
1091 |> restrict_favorited_by(opts)
1092 |> restrict_blocked(restrict_blocked_opts)
1093 |> restrict_muted(restrict_muted_opts)
1094 |> restrict_media(opts)
1095 |> restrict_visibility(opts)
1096 |> restrict_thread_visibility(opts, config)
1097 |> restrict_reblogs(opts)
1098 |> restrict_pinned(opts)
1099 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1100 |> restrict_instance(opts)
1101 |> restrict_announce_object_actor(opts)
1102 |> Activity.restrict_deactivated_users()
1103 |> exclude_poll_votes(opts)
1104 |> exclude_chat_messages(opts)
1105 |> exclude_invisible_actors(opts)
1106 |> exclude_visibility(opts)
1107 end
1108
1109 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1110 list_memberships = Pleroma.List.memberships(opts[:user])
1111
1112 fetch_activities_query(recipients ++ list_memberships, opts)
1113 |> Pagination.fetch_paginated(opts, pagination)
1114 |> Enum.reverse()
1115 |> maybe_update_cc(list_memberships, opts[:user])
1116 end
1117
1118 @doc """
1119 Fetch favorites activities of user with order by sort adds to favorites
1120 """
1121 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1122 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1123 user.ap_id
1124 |> Activity.Queries.by_actor()
1125 |> Activity.Queries.by_type("Like")
1126 |> Activity.with_joined_object()
1127 |> Object.with_joined_activity()
1128 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1129 |> order_by([like, _, _], desc_nulls_last: like.id)
1130 |> Pagination.fetch_paginated(
1131 Map.merge(params, %{skip_order: true}),
1132 pagination
1133 )
1134 end
1135
1136 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1137 Enum.map(activities, fn
1138 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1139 if Enum.any?(bcc, &(&1 in list_memberships)) do
1140 update_in(activity.data["cc"], &[user_ap_id | &1])
1141 else
1142 activity
1143 end
1144
1145 activity ->
1146 activity
1147 end)
1148 end
1149
1150 defp maybe_update_cc(activities, _, _), do: activities
1151
1152 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1153 from(activity in query,
1154 where:
1155 fragment("? && ?", activity.recipients, ^recipients) or
1156 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1157 ^Constants.as_public() in activity.recipients)
1158 )
1159 end
1160
1161 def fetch_activities_bounded(
1162 recipients,
1163 recipients_with_public,
1164 opts \\ %{},
1165 pagination \\ :keyset
1166 ) do
1167 fetch_activities_query([], opts)
1168 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1169 |> Pagination.fetch_paginated(opts, pagination)
1170 |> Enum.reverse()
1171 end
1172
1173 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1174 def upload(file, opts \\ []) do
1175 with {:ok, data} <- Upload.store(file, opts) do
1176 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1177
1178 Repo.insert(%Object{data: obj_data})
1179 end
1180 end
1181
1182 @spec get_actor_url(any()) :: binary() | nil
1183 defp get_actor_url(url) when is_binary(url), do: url
1184 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1185
1186 defp get_actor_url(url) when is_list(url) do
1187 url
1188 |> List.first()
1189 |> get_actor_url()
1190 end
1191
1192 defp get_actor_url(_url), do: nil
1193
1194 defp object_to_user_data(data) do
1195 avatar =
1196 data["icon"]["url"] &&
1197 %{
1198 "type" => "Image",
1199 "url" => [%{"href" => data["icon"]["url"]}]
1200 }
1201
1202 banner =
1203 data["image"]["url"] &&
1204 %{
1205 "type" => "Image",
1206 "url" => [%{"href" => data["image"]["url"]}]
1207 }
1208
1209 fields =
1210 data
1211 |> Map.get("attachment", [])
1212 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1213 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1214
1215 emojis =
1216 data
1217 |> Map.get("tag", [])
1218 |> Enum.filter(fn
1219 %{"type" => "Emoji"} -> true
1220 _ -> false
1221 end)
1222 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1223 {String.trim(name, ":"), url}
1224 end)
1225
1226 locked = data["manuallyApprovesFollowers"] || false
1227 accepts_chat_messages = data["acceptsChatMessages"]
1228 data = Transmogrifier.maybe_fix_user_object(data)
1229 discoverable = data["discoverable"] || false
1230 invisible = data["invisible"] || false
1231 actor_type = data["type"] || "Person"
1232
1233 public_key =
1234 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1235 data["publicKey"]["publicKeyPem"]
1236 else
1237 nil
1238 end
1239
1240 shared_inbox =
1241 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1242 data["endpoints"]["sharedInbox"]
1243 else
1244 nil
1245 end
1246
1247 user_data = %{
1248 ap_id: data["id"],
1249 uri: get_actor_url(data["url"]),
1250 ap_enabled: true,
1251 banner: banner,
1252 fields: fields,
1253 emoji: emojis,
1254 locked: locked,
1255 discoverable: discoverable,
1256 invisible: invisible,
1257 avatar: avatar,
1258 name: data["name"],
1259 follower_address: data["followers"],
1260 following_address: data["following"],
1261 bio: data["summary"],
1262 actor_type: actor_type,
1263 also_known_as: Map.get(data, "alsoKnownAs", []),
1264 public_key: public_key,
1265 inbox: data["inbox"],
1266 shared_inbox: shared_inbox,
1267 accepts_chat_messages: accepts_chat_messages
1268 }
1269
1270 # nickname can be nil because of virtual actors
1271 if data["preferredUsername"] do
1272 Map.put(
1273 user_data,
1274 :nickname,
1275 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1276 )
1277 else
1278 Map.put(user_data, :nickname, nil)
1279 end
1280 end
1281
1282 def fetch_follow_information_for_user(user) do
1283 with {:ok, following_data} <-
1284 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1285 {:ok, hide_follows} <- collection_private(following_data),
1286 {:ok, followers_data} <-
1287 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1288 {:ok, hide_followers} <- collection_private(followers_data) do
1289 {:ok,
1290 %{
1291 hide_follows: hide_follows,
1292 follower_count: normalize_counter(followers_data["totalItems"]),
1293 following_count: normalize_counter(following_data["totalItems"]),
1294 hide_followers: hide_followers
1295 }}
1296 else
1297 {:error, _} = e -> e
1298 e -> {:error, e}
1299 end
1300 end
1301
1302 defp normalize_counter(counter) when is_integer(counter), do: counter
1303 defp normalize_counter(_), do: 0
1304
1305 def maybe_update_follow_information(user_data) do
1306 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1307 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1308 {_, true} <-
1309 {:collections_available,
1310 !!(user_data[:following_address] && user_data[:follower_address])},
1311 {:ok, info} <-
1312 fetch_follow_information_for_user(user_data) do
1313 info = Map.merge(user_data[:info] || %{}, info)
1314
1315 user_data
1316 |> Map.put(:info, info)
1317 else
1318 {:user_type_check, false} ->
1319 user_data
1320
1321 {:collections_available, false} ->
1322 user_data
1323
1324 {:enabled, false} ->
1325 user_data
1326
1327 e ->
1328 Logger.error(
1329 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1330 )
1331
1332 user_data
1333 end
1334 end
1335
1336 defp collection_private(%{"first" => %{"type" => type}})
1337 when type in ["CollectionPage", "OrderedCollectionPage"],
1338 do: {:ok, false}
1339
1340 defp collection_private(%{"first" => first}) do
1341 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1342 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1343 {:ok, false}
1344 else
1345 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1346 {:error, _} = e -> e
1347 e -> {:error, e}
1348 end
1349 end
1350
1351 defp collection_private(_data), do: {:ok, true}
1352
1353 def user_data_from_user_object(data) do
1354 with {:ok, data} <- MRF.filter(data) do
1355 {:ok, object_to_user_data(data)}
1356 else
1357 e -> {:error, e}
1358 end
1359 end
1360
1361 def fetch_and_prepare_user_from_ap_id(ap_id) do
1362 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1363 {:ok, data} <- user_data_from_user_object(data) do
1364 {:ok, maybe_update_follow_information(data)}
1365 else
1366 {:error, "Object has been deleted" = e} ->
1367 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1368 {:error, e}
1369
1370 {:error, e} ->
1371 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1372 {:error, e}
1373 end
1374 end
1375
1376 def maybe_handle_clashing_nickname(nickname) do
1377 with %User{} = old_user <- User.get_by_nickname(nickname) do
1378 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1379
1380 old_user
1381 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1382 |> User.update_and_set_cache()
1383 end
1384 end
1385
1386 def make_user_from_ap_id(ap_id) do
1387 user = User.get_cached_by_ap_id(ap_id)
1388
1389 if user && !User.ap_enabled?(user) do
1390 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1391 else
1392 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1393 if user do
1394 user
1395 |> User.remote_user_changeset(data)
1396 |> User.update_and_set_cache()
1397 else
1398 maybe_handle_clashing_nickname(data[:nickname])
1399
1400 data
1401 |> User.remote_user_changeset()
1402 |> Repo.insert()
1403 |> User.set_cache()
1404 end
1405 end
1406 end
1407 end
1408
1409 def make_user_from_nickname(nickname) do
1410 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1411 make_user_from_ap_id(ap_id)
1412 else
1413 _e -> {:error, "No AP id in WebFinger"}
1414 end
1415 end
1416
1417 # filter out broken threads
1418 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1419 entire_thread_visible_for_user?(activity, user)
1420 end
1421
1422 # do post-processing on a specific activity
1423 def contain_activity(%Activity{} = activity, %User{} = user) do
1424 contain_broken_threads(activity, user)
1425 end
1426
1427 def fetch_direct_messages_query do
1428 Activity
1429 |> restrict_type(%{type: "Create"})
1430 |> restrict_visibility(%{visibility: "direct"})
1431 |> order_by([activity], asc: activity.id)
1432 end
1433 end