Merge branch 'develop' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
90 "type" => "Create",
91 "actor" => actor
92 }) do
93 Object.increase_vote_count(reply_ap_id, name, actor)
94 end
95
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
97
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
102 {:ok, object, meta}
103 end
104 end
105
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
109 {:ok, activity} <-
110 Repo.insert(%Activity{
111 data: object,
112 local: local,
113 recipients: recipients,
114 actor: object["actor"]
115 }) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 %Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 }
138 |> Repo.insert()
139 |> maybe_create_activity_expiration()
140
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
143
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
145
146 {:ok, activity}
147 else
148 %Activity{} = activity ->
149 {:ok, activity}
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 error ->
164 {:error, error}
165 end
166 end
167
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
170
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
173 stream_out(activity)
174 stream_out_participations(participations)
175 end
176
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
179 {:ok, activity}
180 end
181 end
182
183 defp maybe_create_activity_expiration(result), do: result
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
326 {:ok, Activity.t()} | {:error, any()}
327 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
330 result
331 end
332 end
333
334 defp do_follow(follower, followed, activity_id, local, opts) do
335 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
336 data = make_follow_data(follower, followed, activity_id)
337
338 with {:ok, activity} <- insert(data, local),
339 _ <- skip_notify_and_stream || notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 {:error, error} -> Repo.rollback(error)
344 end
345 end
346
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_unfollow(follower, followed, activity_id, local) do
357 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
358 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
359 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
360 {:ok, activity} <- insert(unfollow_data, local),
361 _ <- notify_and_stream(activity),
362 :ok <- maybe_federate(activity) do
363 {:ok, activity}
364 else
365 nil -> nil
366 {:error, error} -> Repo.rollback(error)
367 end
368 end
369
370 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
371 def flag(
372 %{
373 actor: actor,
374 context: _context,
375 account: account,
376 statuses: statuses,
377 content: content
378 } = params
379 ) do
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
383
384 additional = params[:additional] || %{}
385
386 additional =
387 if forward do
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
389 else
390 Map.merge(additional, %{"to" => [], "cc" => []})
391 end
392
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
397 :ok <- maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> not is_nil(user.email) end)
400 |> Enum.each(fn superuser ->
401 superuser
402 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
403 |> Pleroma.Emails.Mailer.deliver_async()
404 end)
405
406 {:ok, activity}
407 end
408 end
409
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
412 params = %{
413 "type" => "Move",
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
417 }
418
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
423
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
427 })
428
429 {:ok, activity}
430 else
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
432 err -> err
433 end
434 end
435
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
438
439 recipients =
440 if opts[:user],
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
442 else: public
443
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_recipients(recipients, opts[:user])
450 |> restrict_filtered(opts)
451 |> where(
452 [activity],
453 fragment(
454 "?->>'type' = ? and ?->>'context' = ?",
455 activity.data,
456 "Create",
457 activity.data,
458 ^context
459 )
460 )
461 |> exclude_poll_votes(opts)
462 |> exclude_id(opts)
463 |> order_by([activity], desc: activity.id)
464 end
465
466 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
467 def fetch_activities_for_context(context, opts \\ %{}) do
468 context
469 |> fetch_activities_for_context_query(opts)
470 |> Repo.all()
471 end
472
473 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
474 FlakeId.Ecto.CompatType.t() | nil
475 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
476 context
477 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
478 |> restrict_visibility(%{visibility: "direct"})
479 |> limit(1)
480 |> select([a], a.id)
481 |> Repo.one()
482 end
483
484 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
485 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
486 opts = Map.delete(opts, :user)
487
488 [Constants.as_public()]
489 |> fetch_activities_query(opts)
490 |> restrict_unlisted(opts)
491 |> Pagination.fetch_paginated(opts, pagination)
492 end
493
494 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
495 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
496 opts
497 |> Map.put(:restrict_unlisted, true)
498 |> fetch_public_or_unlisted_activities(pagination)
499 end
500
501 @valid_visibilities ~w[direct unlisted public private]
502
503 defp restrict_visibility(query, %{visibility: visibility})
504 when is_list(visibility) do
505 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
506 from(
507 a in query,
508 where:
509 fragment(
510 "activity_visibility(?, ?, ?) = ANY (?)",
511 a.actor,
512 a.recipients,
513 a.data,
514 ^visibility
515 )
516 )
517 else
518 Logger.error("Could not restrict visibility to #{visibility}")
519 end
520 end
521
522 defp restrict_visibility(query, %{visibility: visibility})
523 when visibility in @valid_visibilities do
524 from(
525 a in query,
526 where:
527 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
528 )
529 end
530
531 defp restrict_visibility(_query, %{visibility: visibility})
532 when visibility not in @valid_visibilities do
533 Logger.error("Could not restrict visibility to #{visibility}")
534 end
535
536 defp restrict_visibility(query, _visibility), do: query
537
538 defp exclude_visibility(query, %{exclude_visibilities: visibility})
539 when is_list(visibility) do
540 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
541 from(
542 a in query,
543 where:
544 not fragment(
545 "activity_visibility(?, ?, ?) = ANY (?)",
546 a.actor,
547 a.recipients,
548 a.data,
549 ^visibility
550 )
551 )
552 else
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556 end
557
558 defp exclude_visibility(query, %{exclude_visibilities: visibility})
559 when visibility in @valid_visibilities do
560 from(
561 a in query,
562 where:
563 not fragment(
564 "activity_visibility(?, ?, ?) = ?",
565 a.actor,
566 a.recipients,
567 a.data,
568 ^visibility
569 )
570 )
571 end
572
573 defp exclude_visibility(query, %{exclude_visibilities: visibility})
574 when visibility not in [nil | @valid_visibilities] do
575 Logger.error("Could not exclude visibility to #{visibility}")
576 query
577 end
578
579 defp exclude_visibility(query, _visibility), do: query
580
581 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
582 do: query
583
584 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
585 do: query
586
587 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
588 from(
589 a in query,
590 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
591 )
592 end
593
594 defp restrict_thread_visibility(query, _, _), do: query
595
596 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
597 params =
598 params
599 |> Map.put(:user, reading_user)
600 |> Map.put(:actor_id, user.ap_id)
601
602 %{
603 godmode: params[:godmode],
604 reading_user: reading_user
605 }
606 |> user_activities_recipients()
607 |> fetch_activities(params)
608 |> Enum.reverse()
609 end
610
611 def fetch_user_activities(user, reading_user, params \\ %{}) do
612 params =
613 params
614 |> Map.put(:type, ["Create", "Announce"])
615 |> Map.put(:user, reading_user)
616 |> Map.put(:actor_id, user.ap_id)
617 |> Map.put(:pinned_activity_ids, user.pinned_activities)
618
619 params =
620 if User.blocks?(reading_user, user) do
621 params
622 else
623 params
624 |> Map.put(:blocking_user, reading_user)
625 |> Map.put(:muting_user, reading_user)
626 end
627
628 %{
629 godmode: params[:godmode],
630 reading_user: reading_user
631 }
632 |> user_activities_recipients()
633 |> fetch_activities(params)
634 |> Enum.reverse()
635 end
636
637 def fetch_statuses(reading_user, params) do
638 params = Map.put(params, :type, ["Create", "Announce"])
639
640 %{
641 godmode: params[:godmode],
642 reading_user: reading_user
643 }
644 |> user_activities_recipients()
645 |> fetch_activities(params, :offset)
646 |> Enum.reverse()
647 end
648
649 defp user_activities_recipients(%{godmode: true}), do: []
650
651 defp user_activities_recipients(%{reading_user: reading_user}) do
652 if reading_user do
653 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
654 else
655 [Constants.as_public()]
656 end
657 end
658
659 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
660 raise "Can't use the child object without preloading!"
661 end
662
663 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
664 from(
665 [activity, object] in query,
666 where:
667 fragment(
668 "?->>'type' != ? or ?->>'actor' != ?",
669 activity.data,
670 "Announce",
671 object.data,
672 ^actor
673 )
674 )
675 end
676
677 defp restrict_announce_object_actor(query, _), do: query
678
679 defp restrict_since(query, %{since_id: ""}), do: query
680
681 defp restrict_since(query, %{since_id: since_id}) do
682 from(activity in query, where: activity.id > ^since_id)
683 end
684
685 defp restrict_since(query, _), do: query
686
687 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
689 end
690
691 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
692 from(
693 [_activity, object] in query,
694 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
695 )
696 end
697
698 defp restrict_tag_reject(query, _), do: query
699
700 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
701 raise "Can't use the child object without preloading!"
702 end
703
704 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
705 from(
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
708 )
709 end
710
711 defp restrict_tag_all(query, _), do: query
712
713 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
714 raise "Can't use the child object without preloading!"
715 end
716
717 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
718 from(
719 [_activity, object] in query,
720 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
721 )
722 end
723
724 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
725 from(
726 [_activity, object] in query,
727 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
728 )
729 end
730
731 defp restrict_tag(query, _), do: query
732
733 defp restrict_recipients(query, [], _user), do: query
734
735 defp restrict_recipients(query, recipients, nil) do
736 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
737 end
738
739 defp restrict_recipients(query, recipients, user) do
740 from(
741 activity in query,
742 where: fragment("? && ?", ^recipients, activity.recipients),
743 or_where: activity.actor == ^user.ap_id
744 )
745 end
746
747 defp restrict_local(query, %{local_only: true}) do
748 from(activity in query, where: activity.local == true)
749 end
750
751 defp restrict_local(query, _), do: query
752
753 defp restrict_actor(query, %{actor_id: actor_id}) do
754 from(activity in query, where: activity.actor == ^actor_id)
755 end
756
757 defp restrict_actor(query, _), do: query
758
759 defp restrict_type(query, %{type: type}) when is_binary(type) do
760 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
761 end
762
763 defp restrict_type(query, %{type: type}) do
764 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
765 end
766
767 defp restrict_type(query, _), do: query
768
769 defp restrict_state(query, %{state: state}) do
770 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
771 end
772
773 defp restrict_state(query, _), do: query
774
775 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
776 from(
777 [_activity, object] in query,
778 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
779 )
780 end
781
782 defp restrict_favorited_by(query, _), do: query
783
784 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
785 raise "Can't use the child object without preloading!"
786 end
787
788 defp restrict_media(query, %{only_media: true}) do
789 from(
790 [activity, object] in query,
791 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
792 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
793 )
794 end
795
796 defp restrict_media(query, _), do: query
797
798 defp restrict_replies(query, %{exclude_replies: true}) do
799 from(
800 [_activity, object] in query,
801 where: fragment("?->>'inReplyTo' is null", object.data)
802 )
803 end
804
805 defp restrict_replies(query, %{
806 reply_filtering_user: user,
807 reply_visibility: "self"
808 }) do
809 from(
810 [activity, object] in query,
811 where:
812 fragment(
813 "?->>'inReplyTo' is null OR ? = ANY(?)",
814 object.data,
815 ^user.ap_id,
816 activity.recipients
817 )
818 )
819 end
820
821 defp restrict_replies(query, %{
822 reply_filtering_user: user,
823 reply_visibility: "following"
824 }) do
825 from(
826 [activity, object] in query,
827 where:
828 fragment(
829 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
830 object.data,
831 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
832 activity.recipients,
833 activity.actor,
834 activity.actor,
835 ^user.ap_id
836 )
837 )
838 end
839
840 defp restrict_replies(query, _), do: query
841
842 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
843 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
844 end
845
846 defp restrict_reblogs(query, _), do: query
847
848 defp restrict_muted(query, %{with_muted: true}), do: query
849
850 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
851 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
852
853 query =
854 from([activity] in query,
855 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
856 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
857 )
858
859 unless opts[:skip_preload] do
860 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
861 else
862 query
863 end
864 end
865
866 defp restrict_muted(query, _), do: query
867
868 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
869 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
870 domain_blocks = user.domain_blocks || []
871
872 following_ap_ids = User.get_friends_ap_ids(user)
873
874 query =
875 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
876
877 from(
878 [activity, object: o] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
880 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
881 where:
882 fragment(
883 "recipients_contain_blocked_domains(?, ?) = false",
884 activity.recipients,
885 ^domain_blocks
886 ),
887 where:
888 fragment(
889 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
890 activity.data,
891 activity.data,
892 ^blocked_ap_ids
893 ),
894 where:
895 fragment(
896 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
897 activity.actor,
898 ^domain_blocks,
899 activity.actor,
900 ^following_ap_ids
901 ),
902 where:
903 fragment(
904 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
905 o.data,
906 ^domain_blocks,
907 o.data,
908 ^following_ap_ids
909 )
910 )
911 end
912
913 defp restrict_blocked(query, _), do: query
914
915 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
916 from(
917 activity in query,
918 where:
919 fragment(
920 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
921 activity.data,
922 ^[Constants.as_public()]
923 )
924 )
925 end
926
927 defp restrict_unlisted(query, _), do: query
928
929 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
930 from(activity in query, where: activity.id in ^ids)
931 end
932
933 defp restrict_pinned(query, _), do: query
934
935 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
936 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
937
938 from(
939 activity in query,
940 where:
941 fragment(
942 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
943 activity.data,
944 activity.actor,
945 ^muted_reblogs
946 )
947 )
948 end
949
950 defp restrict_muted_reblogs(query, _), do: query
951
952 defp restrict_instance(query, %{instance: instance}) do
953 users =
954 from(
955 u in User,
956 select: u.ap_id,
957 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
958 )
959 |> Repo.all()
960
961 from(activity in query, where: activity.actor in ^users)
962 end
963
964 defp restrict_instance(query, _), do: query
965
966 defp restrict_filtered(query, %{user: %User{} = user}) do
967 case Filter.compose_regex(user) do
968 nil ->
969 query
970
971 regex ->
972 from([activity, object] in query,
973 where:
974 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
975 activity.actor == ^user.ap_id
976 )
977 end
978 end
979
980 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
981 restrict_filtered(query, %{user: user})
982 end
983
984 defp restrict_filtered(query, _), do: query
985
986 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
987
988 defp exclude_poll_votes(query, _) do
989 if has_named_binding?(query, :object) do
990 from([activity, object: o] in query,
991 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
992 )
993 else
994 query
995 end
996 end
997
998 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
999
1000 defp exclude_chat_messages(query, _) do
1001 if has_named_binding?(query, :object) do
1002 from([activity, object: o] in query,
1003 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1004 )
1005 else
1006 query
1007 end
1008 end
1009
1010 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1011
1012 defp exclude_invisible_actors(query, _opts) do
1013 invisible_ap_ids =
1014 User.Query.build(%{invisible: true, select: [:ap_id]})
1015 |> Repo.all()
1016 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1017
1018 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1019 end
1020
1021 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1022 from(activity in query, where: activity.id != ^id)
1023 end
1024
1025 defp exclude_id(query, _), do: query
1026
1027 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1028
1029 defp maybe_preload_objects(query, _) do
1030 query
1031 |> Activity.with_preloaded_object()
1032 end
1033
1034 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1035
1036 defp maybe_preload_bookmarks(query, opts) do
1037 query
1038 |> Activity.with_preloaded_bookmark(opts[:user])
1039 end
1040
1041 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1042 query
1043 |> Activity.with_preloaded_report_notes()
1044 end
1045
1046 defp maybe_preload_report_notes(query, _), do: query
1047
1048 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1049
1050 defp maybe_set_thread_muted_field(query, opts) do
1051 query
1052 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1053 end
1054
1055 defp maybe_order(query, %{order: :desc}) do
1056 query
1057 |> order_by(desc: :id)
1058 end
1059
1060 defp maybe_order(query, %{order: :asc}) do
1061 query
1062 |> order_by(asc: :id)
1063 end
1064
1065 defp maybe_order(query, _), do: query
1066
1067 defp fetch_activities_query_ap_ids_ops(opts) do
1068 source_user = opts[:muting_user]
1069 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1070
1071 ap_id_relationships =
1072 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1073 [:block | ap_id_relationships]
1074 else
1075 ap_id_relationships
1076 end
1077
1078 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1079
1080 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1081 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1082
1083 restrict_muted_reblogs_opts =
1084 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1085
1086 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1087 end
1088
1089 def fetch_activities_query(recipients, opts \\ %{}) do
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1091 fetch_activities_query_ap_ids_ops(opts)
1092
1093 config = %{
1094 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1095 }
1096
1097 Activity
1098 |> maybe_preload_objects(opts)
1099 |> maybe_preload_bookmarks(opts)
1100 |> maybe_preload_report_notes(opts)
1101 |> maybe_set_thread_muted_field(opts)
1102 |> maybe_order(opts)
1103 |> restrict_recipients(recipients, opts[:user])
1104 |> restrict_replies(opts)
1105 |> restrict_tag(opts)
1106 |> restrict_tag_reject(opts)
1107 |> restrict_tag_all(opts)
1108 |> restrict_since(opts)
1109 |> restrict_local(opts)
1110 |> restrict_actor(opts)
1111 |> restrict_type(opts)
1112 |> restrict_state(opts)
1113 |> restrict_favorited_by(opts)
1114 |> restrict_blocked(restrict_blocked_opts)
1115 |> restrict_muted(restrict_muted_opts)
1116 |> restrict_filtered(opts)
1117 |> restrict_media(opts)
1118 |> restrict_visibility(opts)
1119 |> restrict_thread_visibility(opts, config)
1120 |> restrict_reblogs(opts)
1121 |> restrict_pinned(opts)
1122 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1123 |> restrict_instance(opts)
1124 |> restrict_announce_object_actor(opts)
1125 |> restrict_filtered(opts)
1126 |> Activity.restrict_deactivated_users()
1127 |> exclude_poll_votes(opts)
1128 |> exclude_chat_messages(opts)
1129 |> exclude_invisible_actors(opts)
1130 |> exclude_visibility(opts)
1131 end
1132
1133 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1134 list_memberships = Pleroma.List.memberships(opts[:user])
1135
1136 fetch_activities_query(recipients ++ list_memberships, opts)
1137 |> Pagination.fetch_paginated(opts, pagination)
1138 |> Enum.reverse()
1139 |> maybe_update_cc(list_memberships, opts[:user])
1140 end
1141
1142 @doc """
1143 Fetch favorites activities of user with order by sort adds to favorites
1144 """
1145 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1146 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1147 user.ap_id
1148 |> Activity.Queries.by_actor()
1149 |> Activity.Queries.by_type("Like")
1150 |> Activity.with_joined_object()
1151 |> Object.with_joined_activity()
1152 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1153 |> order_by([like, _, _], desc_nulls_last: like.id)
1154 |> Pagination.fetch_paginated(
1155 Map.merge(params, %{skip_order: true}),
1156 pagination
1157 )
1158 end
1159
1160 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1161 Enum.map(activities, fn
1162 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1163 if Enum.any?(bcc, &(&1 in list_memberships)) do
1164 update_in(activity.data["cc"], &[user_ap_id | &1])
1165 else
1166 activity
1167 end
1168
1169 activity ->
1170 activity
1171 end)
1172 end
1173
1174 defp maybe_update_cc(activities, _, _), do: activities
1175
1176 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1177 from(activity in query,
1178 where:
1179 fragment("? && ?", activity.recipients, ^recipients) or
1180 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1181 ^Constants.as_public() in activity.recipients)
1182 )
1183 end
1184
1185 def fetch_activities_bounded(
1186 recipients,
1187 recipients_with_public,
1188 opts \\ %{},
1189 pagination \\ :keyset
1190 ) do
1191 fetch_activities_query([], opts)
1192 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1193 |> Pagination.fetch_paginated(opts, pagination)
1194 |> Enum.reverse()
1195 end
1196
1197 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1198 def upload(file, opts \\ []) do
1199 with {:ok, data} <- Upload.store(file, opts) do
1200 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1201
1202 Repo.insert(%Object{data: obj_data})
1203 end
1204 end
1205
1206 @spec get_actor_url(any()) :: binary() | nil
1207 defp get_actor_url(url) when is_binary(url), do: url
1208 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1209
1210 defp get_actor_url(url) when is_list(url) do
1211 url
1212 |> List.first()
1213 |> get_actor_url()
1214 end
1215
1216 defp get_actor_url(_url), do: nil
1217
1218 defp object_to_user_data(data) do
1219 avatar =
1220 data["icon"]["url"] &&
1221 %{
1222 "type" => "Image",
1223 "url" => [%{"href" => data["icon"]["url"]}]
1224 }
1225
1226 banner =
1227 data["image"]["url"] &&
1228 %{
1229 "type" => "Image",
1230 "url" => [%{"href" => data["image"]["url"]}]
1231 }
1232
1233 fields =
1234 data
1235 |> Map.get("attachment", [])
1236 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1237 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1238
1239 emojis =
1240 data
1241 |> Map.get("tag", [])
1242 |> Enum.filter(fn
1243 %{"type" => "Emoji"} -> true
1244 _ -> false
1245 end)
1246 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1247 {String.trim(name, ":"), url}
1248 end)
1249
1250 locked = data["manuallyApprovesFollowers"] || false
1251 data = Transmogrifier.maybe_fix_user_object(data)
1252 discoverable = data["discoverable"] || false
1253 invisible = data["invisible"] || false
1254 actor_type = data["type"] || "Person"
1255
1256 public_key =
1257 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1258 data["publicKey"]["publicKeyPem"]
1259 else
1260 nil
1261 end
1262
1263 shared_inbox =
1264 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1265 data["endpoints"]["sharedInbox"]
1266 else
1267 nil
1268 end
1269
1270 user_data = %{
1271 ap_id: data["id"],
1272 uri: get_actor_url(data["url"]),
1273 ap_enabled: true,
1274 banner: banner,
1275 fields: fields,
1276 emoji: emojis,
1277 locked: locked,
1278 discoverable: discoverable,
1279 invisible: invisible,
1280 avatar: avatar,
1281 name: data["name"],
1282 follower_address: data["followers"],
1283 following_address: data["following"],
1284 bio: data["summary"],
1285 actor_type: actor_type,
1286 also_known_as: Map.get(data, "alsoKnownAs", []),
1287 public_key: public_key,
1288 inbox: data["inbox"],
1289 shared_inbox: shared_inbox
1290 }
1291
1292 # nickname can be nil because of virtual actors
1293 if data["preferredUsername"] do
1294 Map.put(
1295 user_data,
1296 :nickname,
1297 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1298 )
1299 else
1300 Map.put(user_data, :nickname, nil)
1301 end
1302 end
1303
1304 def fetch_follow_information_for_user(user) do
1305 with {:ok, following_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1307 {:ok, hide_follows} <- collection_private(following_data),
1308 {:ok, followers_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1310 {:ok, hide_followers} <- collection_private(followers_data) do
1311 {:ok,
1312 %{
1313 hide_follows: hide_follows,
1314 follower_count: normalize_counter(followers_data["totalItems"]),
1315 following_count: normalize_counter(following_data["totalItems"]),
1316 hide_followers: hide_followers
1317 }}
1318 else
1319 {:error, _} = e -> e
1320 e -> {:error, e}
1321 end
1322 end
1323
1324 defp normalize_counter(counter) when is_integer(counter), do: counter
1325 defp normalize_counter(_), do: 0
1326
1327 def maybe_update_follow_information(user_data) do
1328 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1329 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1330 {_, true} <-
1331 {:collections_available,
1332 !!(user_data[:following_address] && user_data[:follower_address])},
1333 {:ok, info} <-
1334 fetch_follow_information_for_user(user_data) do
1335 info = Map.merge(user_data[:info] || %{}, info)
1336
1337 user_data
1338 |> Map.put(:info, info)
1339 else
1340 {:user_type_check, false} ->
1341 user_data
1342
1343 {:collections_available, false} ->
1344 user_data
1345
1346 {:enabled, false} ->
1347 user_data
1348
1349 e ->
1350 Logger.error(
1351 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1352 )
1353
1354 user_data
1355 end
1356 end
1357
1358 defp collection_private(%{"first" => %{"type" => type}})
1359 when type in ["CollectionPage", "OrderedCollectionPage"],
1360 do: {:ok, false}
1361
1362 defp collection_private(%{"first" => first}) do
1363 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1364 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1365 {:ok, false}
1366 else
1367 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1368 {:error, _} = e -> e
1369 e -> {:error, e}
1370 end
1371 end
1372
1373 defp collection_private(_data), do: {:ok, true}
1374
1375 def user_data_from_user_object(data) do
1376 with {:ok, data} <- MRF.filter(data) do
1377 {:ok, object_to_user_data(data)}
1378 else
1379 e -> {:error, e}
1380 end
1381 end
1382
1383 def fetch_and_prepare_user_from_ap_id(ap_id) do
1384 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1385 {:ok, data} <- user_data_from_user_object(data) do
1386 {:ok, maybe_update_follow_information(data)}
1387 else
1388 {:error, "Object has been deleted" = e} ->
1389 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1390 {:error, e}
1391
1392 {:error, e} ->
1393 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1394 {:error, e}
1395 end
1396 end
1397
1398 def maybe_handle_clashing_nickname(nickname) do
1399 with %User{} = old_user <- User.get_by_nickname(nickname) do
1400 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1401
1402 old_user
1403 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1404 |> User.update_and_set_cache()
1405 end
1406 end
1407
1408 def make_user_from_ap_id(ap_id) do
1409 user = User.get_cached_by_ap_id(ap_id)
1410
1411 if user && !User.ap_enabled?(user) do
1412 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1413 else
1414 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1415 if user do
1416 user
1417 |> User.remote_user_changeset(data)
1418 |> User.update_and_set_cache()
1419 else
1420 maybe_handle_clashing_nickname(data[:nickname])
1421
1422 data
1423 |> User.remote_user_changeset()
1424 |> Repo.insert()
1425 |> User.set_cache()
1426 end
1427 end
1428 end
1429 end
1430
1431 def make_user_from_nickname(nickname) do
1432 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1433 make_user_from_ap_id(ap_id)
1434 else
1435 _e -> {:error, "No AP id in WebFinger"}
1436 end
1437 end
1438
1439 # filter out broken threads
1440 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1441 entire_thread_visible_for_user?(activity, user)
1442 end
1443
1444 # do post-processing on a specific activity
1445 def contain_activity(%Activity{} = activity, %User{} = user) do
1446 contain_broken_threads(activity, user)
1447 end
1448
1449 def fetch_direct_messages_query do
1450 Activity
1451 |> restrict_type(%{type: "Create"})
1452 |> restrict_visibility(%{visibility: "direct"})
1453 |> order_by([activity], asc: activity.id)
1454 end
1455 end