Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into chat-federation...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
90 "type" => "Create",
91 "actor" => actor
92 }) do
93 Object.increase_vote_count(reply_ap_id, name, actor)
94 end
95
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
97
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
102 {:ok, object, meta}
103 end
104 end
105
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
109 {:ok, activity} <-
110 Repo.insert(%Activity{
111 data: object,
112 local: local,
113 recipients: recipients,
114 actor: object["actor"]
115 }) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 %Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 }
138 |> Repo.insert()
139 |> maybe_create_activity_expiration()
140
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
143
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
145
146 {:ok, activity}
147 else
148 %Activity{} = activity ->
149 {:ok, activity}
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 error ->
164 {:error, error}
165 end
166 end
167
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
170
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
173 stream_out(activity)
174 stream_out_participations(participations)
175 end
176
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
179 {:ok, activity}
180 end
181 end
182
183 defp maybe_create_activity_expiration(result), do: result
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 result
331 end
332 end
333
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 nil -> nil
344 {:error, error} -> Repo.rollback(error)
345 end
346 end
347
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
349 def flag(
350 %{
351 actor: actor,
352 context: _context,
353 account: account,
354 statuses: statuses,
355 content: content
356 } = params
357 ) do
358 # only accept false as false value
359 local = !(params[:local] == false)
360 forward = !(params[:forward] == false)
361
362 additional = params[:additional] || %{}
363
364 additional =
365 if forward do
366 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
367 else
368 Map.merge(additional, %{"to" => [], "cc" => []})
369 end
370
371 with flag_data <- make_flag_data(params, additional),
372 {:ok, activity} <- insert(flag_data, local),
373 {:ok, stripped_activity} <- strip_report_status_data(activity),
374 _ <- notify_and_stream(activity),
375 :ok <- maybe_federate(stripped_activity) do
376 User.all_superusers()
377 |> Enum.filter(fn user -> not is_nil(user.email) end)
378 |> Enum.each(fn superuser ->
379 superuser
380 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
381 |> Pleroma.Emails.Mailer.deliver_async()
382 end)
383
384 {:ok, activity}
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params)
612 |> Enum.reverse()
613 end
614
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
617
618 %{
619 godmode: params[:godmode],
620 reading_user: reading_user
621 }
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
624 |> Enum.reverse()
625 end
626
627 defp user_activities_recipients(%{godmode: true}), do: []
628
629 defp user_activities_recipients(%{reading_user: reading_user}) do
630 if reading_user do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
632 else
633 [Constants.as_public()]
634 end
635 end
636
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
639 end
640
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
642 from(
643 [activity, object] in query,
644 where:
645 fragment(
646 "?->>'type' != ? or ?->>'actor' != ?",
647 activity.data,
648 "Announce",
649 object.data,
650 ^actor
651 )
652 )
653 end
654
655 defp restrict_announce_object_actor(query, _), do: query
656
657 defp restrict_since(query, %{since_id: ""}), do: query
658
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, _), do: query
677
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
680 end
681
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
683 from(
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
686 )
687 end
688
689 defp restrict_tag_all(query, _), do: query
690
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
693 end
694
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
696 from(
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
699 )
700 end
701
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, _), do: query
710
711 defp restrict_recipients(query, [], _user), do: query
712
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
715 end
716
717 defp restrict_recipients(query, recipients, user) do
718 from(
719 activity in query,
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
722 )
723 end
724
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
727 end
728
729 defp restrict_local(query, _), do: query
730
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
733 end
734
735 defp restrict_actor(query, _), do: query
736
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
739 end
740
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
743 end
744
745 defp restrict_type(query, _), do: query
746
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
749 end
750
751 defp restrict_state(query, _), do: query
752
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
757 )
758 end
759
760 defp restrict_favorited_by(query, _), do: query
761
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
764 end
765
766 defp restrict_media(query, %{only_media: true}) do
767 from(
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 )
772 end
773
774 defp restrict_media(query, _), do: query
775
776 defp restrict_replies(query, %{exclude_replies: true}) do
777 from(
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
780 )
781 end
782
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "self"
786 }) do
787 from(
788 [activity, object] in query,
789 where:
790 fragment(
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
792 object.data,
793 ^user.ap_id,
794 activity.recipients
795 )
796 )
797 end
798
799 defp restrict_replies(query, %{
800 reply_filtering_user: user,
801 reply_visibility: "following"
802 }) do
803 from(
804 [activity, object] in query,
805 where:
806 fragment(
807 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
808 object.data,
809 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
810 activity.recipients,
811 activity.actor,
812 activity.actor,
813 ^user.ap_id
814 )
815 )
816 end
817
818 defp restrict_replies(query, _), do: query
819
820 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
821 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
822 end
823
824 defp restrict_reblogs(query, _), do: query
825
826 defp restrict_muted(query, %{with_muted: true}), do: query
827
828 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
829 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
830
831 query =
832 from([activity] in query,
833 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
834 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
835 )
836
837 unless opts[:skip_preload] do
838 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
839 else
840 query
841 end
842 end
843
844 defp restrict_muted(query, _), do: query
845
846 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
847 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
848 domain_blocks = user.domain_blocks || []
849
850 following_ap_ids = User.get_friends_ap_ids(user)
851
852 query =
853 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
854
855 from(
856 [activity, object: o] in query,
857 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
858 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
859 where:
860 fragment(
861 "recipients_contain_blocked_domains(?, ?) = false",
862 activity.recipients,
863 ^domain_blocks
864 ),
865 where:
866 fragment(
867 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
868 activity.data,
869 activity.data,
870 ^blocked_ap_ids
871 ),
872 where:
873 fragment(
874 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
875 activity.actor,
876 ^domain_blocks,
877 activity.actor,
878 ^following_ap_ids
879 ),
880 where:
881 fragment(
882 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
883 o.data,
884 ^domain_blocks,
885 o.data,
886 ^following_ap_ids
887 )
888 )
889 end
890
891 defp restrict_blocked(query, _), do: query
892
893 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
894 from(
895 activity in query,
896 where:
897 fragment(
898 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
899 activity.data,
900 ^[Constants.as_public()]
901 )
902 )
903 end
904
905 defp restrict_unlisted(query, _), do: query
906
907 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
908 from(activity in query, where: activity.id in ^ids)
909 end
910
911 defp restrict_pinned(query, _), do: query
912
913 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
914 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
915
916 from(
917 activity in query,
918 where:
919 fragment(
920 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
921 activity.data,
922 activity.actor,
923 ^muted_reblogs
924 )
925 )
926 end
927
928 defp restrict_muted_reblogs(query, _), do: query
929
930 defp restrict_instance(query, %{instance: instance}) do
931 users =
932 from(
933 u in User,
934 select: u.ap_id,
935 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
936 )
937 |> Repo.all()
938
939 from(activity in query, where: activity.actor in ^users)
940 end
941
942 defp restrict_instance(query, _), do: query
943
944 defp restrict_filtered(query, %{user: %User{} = user}) do
945 case Filter.compose_regex(user) do
946 nil ->
947 query
948
949 regex ->
950 from([activity, object] in query,
951 where:
952 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
953 activity.actor == ^user.ap_id
954 )
955 end
956 end
957
958 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
959 restrict_filtered(query, %{user: user})
960 end
961
962 defp restrict_filtered(query, _), do: query
963
964 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
965
966 defp exclude_poll_votes(query, _) do
967 if has_named_binding?(query, :object) do
968 from([activity, object: o] in query,
969 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
970 )
971 else
972 query
973 end
974 end
975
976 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
977
978 defp exclude_chat_messages(query, _) do
979 if has_named_binding?(query, :object) do
980 from([activity, object: o] in query,
981 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
982 )
983 else
984 query
985 end
986 end
987
988 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
989
990 defp exclude_invisible_actors(query, _opts) do
991 invisible_ap_ids =
992 User.Query.build(%{invisible: true, select: [:ap_id]})
993 |> Repo.all()
994 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
995
996 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
997 end
998
999 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1000 from(activity in query, where: activity.id != ^id)
1001 end
1002
1003 defp exclude_id(query, _), do: query
1004
1005 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1006
1007 defp maybe_preload_objects(query, _) do
1008 query
1009 |> Activity.with_preloaded_object()
1010 end
1011
1012 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1013
1014 defp maybe_preload_bookmarks(query, opts) do
1015 query
1016 |> Activity.with_preloaded_bookmark(opts[:user])
1017 end
1018
1019 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1020 query
1021 |> Activity.with_preloaded_report_notes()
1022 end
1023
1024 defp maybe_preload_report_notes(query, _), do: query
1025
1026 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1027
1028 defp maybe_set_thread_muted_field(query, opts) do
1029 query
1030 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1031 end
1032
1033 defp maybe_order(query, %{order: :desc}) do
1034 query
1035 |> order_by(desc: :id)
1036 end
1037
1038 defp maybe_order(query, %{order: :asc}) do
1039 query
1040 |> order_by(asc: :id)
1041 end
1042
1043 defp maybe_order(query, _), do: query
1044
1045 defp fetch_activities_query_ap_ids_ops(opts) do
1046 source_user = opts[:muting_user]
1047 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1048
1049 ap_id_relationships =
1050 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1051 [:block | ap_id_relationships]
1052 else
1053 ap_id_relationships
1054 end
1055
1056 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1057
1058 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1059 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1060
1061 restrict_muted_reblogs_opts =
1062 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1063
1064 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1065 end
1066
1067 def fetch_activities_query(recipients, opts \\ %{}) do
1068 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1069 fetch_activities_query_ap_ids_ops(opts)
1070
1071 config = %{
1072 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1073 }
1074
1075 Activity
1076 |> maybe_preload_objects(opts)
1077 |> maybe_preload_bookmarks(opts)
1078 |> maybe_preload_report_notes(opts)
1079 |> maybe_set_thread_muted_field(opts)
1080 |> maybe_order(opts)
1081 |> restrict_recipients(recipients, opts[:user])
1082 |> restrict_replies(opts)
1083 |> restrict_tag(opts)
1084 |> restrict_tag_reject(opts)
1085 |> restrict_tag_all(opts)
1086 |> restrict_since(opts)
1087 |> restrict_local(opts)
1088 |> restrict_actor(opts)
1089 |> restrict_type(opts)
1090 |> restrict_state(opts)
1091 |> restrict_favorited_by(opts)
1092 |> restrict_blocked(restrict_blocked_opts)
1093 |> restrict_muted(restrict_muted_opts)
1094 |> restrict_filtered(opts)
1095 |> restrict_media(opts)
1096 |> restrict_visibility(opts)
1097 |> restrict_thread_visibility(opts, config)
1098 |> restrict_reblogs(opts)
1099 |> restrict_pinned(opts)
1100 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1101 |> restrict_instance(opts)
1102 |> restrict_announce_object_actor(opts)
1103 |> restrict_filtered(opts)
1104 |> Activity.restrict_deactivated_users()
1105 |> exclude_poll_votes(opts)
1106 |> exclude_chat_messages(opts)
1107 |> exclude_invisible_actors(opts)
1108 |> exclude_visibility(opts)
1109 end
1110
1111 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1112 list_memberships = Pleroma.List.memberships(opts[:user])
1113
1114 fetch_activities_query(recipients ++ list_memberships, opts)
1115 |> Pagination.fetch_paginated(opts, pagination)
1116 |> Enum.reverse()
1117 |> maybe_update_cc(list_memberships, opts[:user])
1118 end
1119
1120 @doc """
1121 Fetch favorites activities of user with order by sort adds to favorites
1122 """
1123 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1124 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1125 user.ap_id
1126 |> Activity.Queries.by_actor()
1127 |> Activity.Queries.by_type("Like")
1128 |> Activity.with_joined_object()
1129 |> Object.with_joined_activity()
1130 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1131 |> order_by([like, _, _], desc_nulls_last: like.id)
1132 |> Pagination.fetch_paginated(
1133 Map.merge(params, %{skip_order: true}),
1134 pagination
1135 )
1136 end
1137
1138 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1139 Enum.map(activities, fn
1140 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1141 if Enum.any?(bcc, &(&1 in list_memberships)) do
1142 update_in(activity.data["cc"], &[user_ap_id | &1])
1143 else
1144 activity
1145 end
1146
1147 activity ->
1148 activity
1149 end)
1150 end
1151
1152 defp maybe_update_cc(activities, _, _), do: activities
1153
1154 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1155 from(activity in query,
1156 where:
1157 fragment("? && ?", activity.recipients, ^recipients) or
1158 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1159 ^Constants.as_public() in activity.recipients)
1160 )
1161 end
1162
1163 def fetch_activities_bounded(
1164 recipients,
1165 recipients_with_public,
1166 opts \\ %{},
1167 pagination \\ :keyset
1168 ) do
1169 fetch_activities_query([], opts)
1170 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1171 |> Pagination.fetch_paginated(opts, pagination)
1172 |> Enum.reverse()
1173 end
1174
1175 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1176 def upload(file, opts \\ []) do
1177 with {:ok, data} <- Upload.store(file, opts) do
1178 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1179
1180 Repo.insert(%Object{data: obj_data})
1181 end
1182 end
1183
1184 @spec get_actor_url(any()) :: binary() | nil
1185 defp get_actor_url(url) when is_binary(url), do: url
1186 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1187
1188 defp get_actor_url(url) when is_list(url) do
1189 url
1190 |> List.first()
1191 |> get_actor_url()
1192 end
1193
1194 defp get_actor_url(_url), do: nil
1195
1196 defp object_to_user_data(data) do
1197 avatar =
1198 data["icon"]["url"] &&
1199 %{
1200 "type" => "Image",
1201 "url" => [%{"href" => data["icon"]["url"]}]
1202 }
1203
1204 banner =
1205 data["image"]["url"] &&
1206 %{
1207 "type" => "Image",
1208 "url" => [%{"href" => data["image"]["url"]}]
1209 }
1210
1211 fields =
1212 data
1213 |> Map.get("attachment", [])
1214 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1215 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1216
1217 emojis =
1218 data
1219 |> Map.get("tag", [])
1220 |> Enum.filter(fn
1221 %{"type" => "Emoji"} -> true
1222 _ -> false
1223 end)
1224 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1225 {String.trim(name, ":"), url}
1226 end)
1227
1228 locked = data["manuallyApprovesFollowers"] || false
1229 capabilities = data["capabilities"] || %{}
1230 accepts_chat_messages = capabilities["acceptsChatMessages"]
1231 data = Transmogrifier.maybe_fix_user_object(data)
1232 discoverable = data["discoverable"] || false
1233 invisible = data["invisible"] || false
1234 actor_type = data["type"] || "Person"
1235
1236 public_key =
1237 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1238 data["publicKey"]["publicKeyPem"]
1239 else
1240 nil
1241 end
1242
1243 shared_inbox =
1244 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1245 data["endpoints"]["sharedInbox"]
1246 else
1247 nil
1248 end
1249
1250 user_data = %{
1251 ap_id: data["id"],
1252 uri: get_actor_url(data["url"]),
1253 ap_enabled: true,
1254 banner: banner,
1255 fields: fields,
1256 emoji: emojis,
1257 locked: locked,
1258 discoverable: discoverable,
1259 invisible: invisible,
1260 avatar: avatar,
1261 name: data["name"],
1262 follower_address: data["followers"],
1263 following_address: data["following"],
1264 bio: data["summary"],
1265 actor_type: actor_type,
1266 also_known_as: Map.get(data, "alsoKnownAs", []),
1267 public_key: public_key,
1268 inbox: data["inbox"],
1269 shared_inbox: shared_inbox,
1270 accepts_chat_messages: accepts_chat_messages
1271 }
1272
1273 # nickname can be nil because of virtual actors
1274 if data["preferredUsername"] do
1275 Map.put(
1276 user_data,
1277 :nickname,
1278 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1279 )
1280 else
1281 Map.put(user_data, :nickname, nil)
1282 end
1283 end
1284
1285 def fetch_follow_information_for_user(user) do
1286 with {:ok, following_data} <-
1287 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1288 {:ok, hide_follows} <- collection_private(following_data),
1289 {:ok, followers_data} <-
1290 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1291 {:ok, hide_followers} <- collection_private(followers_data) do
1292 {:ok,
1293 %{
1294 hide_follows: hide_follows,
1295 follower_count: normalize_counter(followers_data["totalItems"]),
1296 following_count: normalize_counter(following_data["totalItems"]),
1297 hide_followers: hide_followers
1298 }}
1299 else
1300 {:error, _} = e -> e
1301 e -> {:error, e}
1302 end
1303 end
1304
1305 defp normalize_counter(counter) when is_integer(counter), do: counter
1306 defp normalize_counter(_), do: 0
1307
1308 def maybe_update_follow_information(user_data) do
1309 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1310 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1311 {_, true} <-
1312 {:collections_available,
1313 !!(user_data[:following_address] && user_data[:follower_address])},
1314 {:ok, info} <-
1315 fetch_follow_information_for_user(user_data) do
1316 info = Map.merge(user_data[:info] || %{}, info)
1317
1318 user_data
1319 |> Map.put(:info, info)
1320 else
1321 {:user_type_check, false} ->
1322 user_data
1323
1324 {:collections_available, false} ->
1325 user_data
1326
1327 {:enabled, false} ->
1328 user_data
1329
1330 e ->
1331 Logger.error(
1332 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1333 )
1334
1335 user_data
1336 end
1337 end
1338
1339 defp collection_private(%{"first" => %{"type" => type}})
1340 when type in ["CollectionPage", "OrderedCollectionPage"],
1341 do: {:ok, false}
1342
1343 defp collection_private(%{"first" => first}) do
1344 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1345 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1346 {:ok, false}
1347 else
1348 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1349 {:error, _} = e -> e
1350 e -> {:error, e}
1351 end
1352 end
1353
1354 defp collection_private(_data), do: {:ok, true}
1355
1356 def user_data_from_user_object(data) do
1357 with {:ok, data} <- MRF.filter(data) do
1358 {:ok, object_to_user_data(data)}
1359 else
1360 e -> {:error, e}
1361 end
1362 end
1363
1364 def fetch_and_prepare_user_from_ap_id(ap_id) do
1365 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1366 {:ok, data} <- user_data_from_user_object(data) do
1367 {:ok, maybe_update_follow_information(data)}
1368 else
1369 {:error, "Object has been deleted" = e} ->
1370 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1371 {:error, e}
1372
1373 {:error, e} ->
1374 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1375 {:error, e}
1376 end
1377 end
1378
1379 def maybe_handle_clashing_nickname(nickname) do
1380 with %User{} = old_user <- User.get_by_nickname(nickname) do
1381 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1382
1383 old_user
1384 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1385 |> User.update_and_set_cache()
1386 end
1387 end
1388
1389 def make_user_from_ap_id(ap_id) do
1390 user = User.get_cached_by_ap_id(ap_id)
1391
1392 if user && !User.ap_enabled?(user) do
1393 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1394 else
1395 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1396 if user do
1397 user
1398 |> User.remote_user_changeset(data)
1399 |> User.update_and_set_cache()
1400 else
1401 maybe_handle_clashing_nickname(data[:nickname])
1402
1403 data
1404 |> User.remote_user_changeset()
1405 |> Repo.insert()
1406 |> User.set_cache()
1407 end
1408 end
1409 end
1410 end
1411
1412 def make_user_from_nickname(nickname) do
1413 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1414 make_user_from_ap_id(ap_id)
1415 else
1416 _e -> {:error, "No AP id in WebFinger"}
1417 end
1418 end
1419
1420 # filter out broken threads
1421 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1422 entire_thread_visible_for_user?(activity, user)
1423 end
1424
1425 # do post-processing on a specific activity
1426 def contain_activity(%Activity{} = activity, %User{} = user) do
1427 contain_broken_threads(activity, user)
1428 end
1429
1430 def fetch_direct_messages_query do
1431 Activity
1432 |> restrict_type(%{type: "Create"})
1433 |> restrict_visibility(%{visibility: "direct"})
1434 |> order_by([activity], asc: activity.id)
1435 end
1436 end