Merge branch 'update-validator' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
89 "type" => "Create",
90 "actor" => actor
91 }) do
92 Object.increase_vote_count(reply_ap_id, name, actor)
93 end
94
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
96
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
101 {:ok, object, meta}
102 end
103 end
104
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
108 {:ok, activity} <-
109 Repo.insert(%Activity{
110 data: object,
111 local: local,
112 recipients: recipients,
113 actor: object["actor"]
114 }) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 %Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 }
137 |> Repo.insert()
138 |> maybe_create_activity_expiration()
139
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
142
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
144
145 {:ok, activity}
146 else
147 %Activity{} = activity ->
148 {:ok, activity}
149
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
152 data: map,
153 local: local,
154 actor: map["actor"],
155 recipients: recipients,
156 id: "pleroma:fakeid"
157 }
158
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:ok, activity}
161
162 error ->
163 {:error, error}
164 end
165 end
166
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
169
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
172 stream_out(activity)
173 stream_out_participations(participations)
174 end
175
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
178 {:ok, activity}
179 end
180 end
181
182 defp maybe_create_activity_expiration(result), do: result
183
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
188 {:ok, conversation}
189 end
190 end
191
192 defp get_participations({:ok, conversation}) do
193 conversation
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
196 end
197
198 defp get_participations(_), do: []
199
200 def stream_out_participations(participations) do
201 participations =
202 participations
203 |> Repo.preload(:user)
204
205 Streamer.stream("participation", participations)
206 end
207
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
211
212 last_activity_id =
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
214 user: user,
215 blocking_user: user
216 })
217
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
220 end
221 end
222 end
223
224 def stream_out_participations(_, _), do: :noop
225
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
228 activity
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
231 end
232
233 def stream_out(_activity) do
234 :noop
235 end
236
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
240 result
241 end
242 end
243
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
250
251 create_data =
252 make_create_data(
253 %{to: to, actor: actor, published: published, context: context, object: object},
254 additional
255 )
256
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity}
266 else
267 {:quick_insert, true, activity} ->
268 {:ok, activity}
269
270 {:fake, true, activity} ->
271 {:ok, activity}
272
273 {:error, message} ->
274 Repo.rollback(message)
275 end
276 end
277
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
284
285 listen_data =
286 make_listen_data(
287 %{to: to, actor: actor, published: published, context: context, object: object},
288 additional
289 )
290
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
301 end
302
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
306 end
307
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
312
313 data =
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
316
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
329 result
330 end
331 end
332
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
336
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
340 {:ok, activity}
341 else
342 {:error, error} -> Repo.rollback(error)
343 end
344 end
345
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
351 result
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 nil -> nil
365 {:error, error} -> Repo.rollback(error)
366 end
367 end
368
369 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
370 {:ok, Activity.t()} | {:error, any()}
371 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
372 with {:ok, result} <-
373 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
374 result
375 end
376 end
377
378 defp do_block(blocker, blocked, activity_id, local) do
379 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
380
381 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
382 unfollow(blocker, blocked, nil, local)
383 end
384
385 block_data = make_block_data(blocker, blocked, activity_id)
386
387 with {:ok, activity} <- insert(block_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
390 {:ok, activity}
391 else
392 {:error, error} -> Repo.rollback(error)
393 end
394 end
395
396 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
397 def flag(
398 %{
399 actor: actor,
400 context: _context,
401 account: account,
402 statuses: statuses,
403 content: content
404 } = params
405 ) do
406 # only accept false as false value
407 local = !(params[:local] == false)
408 forward = !(params[:forward] == false)
409
410 additional = params[:additional] || %{}
411
412 additional =
413 if forward do
414 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
415 else
416 Map.merge(additional, %{"to" => [], "cc" => []})
417 end
418
419 with flag_data <- make_flag_data(params, additional),
420 {:ok, activity} <- insert(flag_data, local),
421 {:ok, stripped_activity} <- strip_report_status_data(activity),
422 _ <- notify_and_stream(activity),
423 :ok <- maybe_federate(stripped_activity) do
424 User.all_superusers()
425 |> Enum.filter(fn user -> not is_nil(user.email) end)
426 |> Enum.each(fn superuser ->
427 superuser
428 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
429 |> Pleroma.Emails.Mailer.deliver_async()
430 end)
431
432 {:ok, activity}
433 end
434 end
435
436 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
437 def move(%User{} = origin, %User{} = target, local \\ true) do
438 params = %{
439 "type" => "Move",
440 "actor" => origin.ap_id,
441 "object" => origin.ap_id,
442 "target" => target.ap_id
443 }
444
445 with true <- origin.ap_id in target.also_known_as,
446 {:ok, activity} <- insert(params, local),
447 _ <- notify_and_stream(activity) do
448 maybe_federate(activity)
449
450 BackgroundWorker.enqueue("move_following", %{
451 "origin_id" => origin.id,
452 "target_id" => target.id
453 })
454
455 {:ok, activity}
456 else
457 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
458 err -> err
459 end
460 end
461
462 def fetch_activities_for_context_query(context, opts) do
463 public = [Constants.as_public()]
464
465 recipients =
466 if opts[:user],
467 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
468 else: public
469
470 from(activity in Activity)
471 |> maybe_preload_objects(opts)
472 |> maybe_preload_bookmarks(opts)
473 |> maybe_set_thread_muted_field(opts)
474 |> restrict_blocked(opts)
475 |> restrict_recipients(recipients, opts[:user])
476 |> where(
477 [activity],
478 fragment(
479 "?->>'type' = ? and ?->>'context' = ?",
480 activity.data,
481 "Create",
482 activity.data,
483 ^context
484 )
485 )
486 |> exclude_poll_votes(opts)
487 |> exclude_id(opts)
488 |> order_by([activity], desc: activity.id)
489 end
490
491 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
492 def fetch_activities_for_context(context, opts \\ %{}) do
493 context
494 |> fetch_activities_for_context_query(opts)
495 |> Repo.all()
496 end
497
498 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
499 FlakeId.Ecto.CompatType.t() | nil
500 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
501 context
502 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
503 |> restrict_visibility(%{visibility: "direct"})
504 |> limit(1)
505 |> select([a], a.id)
506 |> Repo.one()
507 end
508
509 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
510 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
511 opts = Map.delete(opts, :user)
512
513 [Constants.as_public()]
514 |> fetch_activities_query(opts)
515 |> restrict_unlisted(opts)
516 |> Pagination.fetch_paginated(opts, pagination)
517 end
518
519 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
520 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
521 opts
522 |> Map.put(:restrict_unlisted, true)
523 |> fetch_public_or_unlisted_activities(pagination)
524 end
525
526 @valid_visibilities ~w[direct unlisted public private]
527
528 defp restrict_visibility(query, %{visibility: visibility})
529 when is_list(visibility) do
530 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
531 from(
532 a in query,
533 where:
534 fragment(
535 "activity_visibility(?, ?, ?) = ANY (?)",
536 a.actor,
537 a.recipients,
538 a.data,
539 ^visibility
540 )
541 )
542 else
543 Logger.error("Could not restrict visibility to #{visibility}")
544 end
545 end
546
547 defp restrict_visibility(query, %{visibility: visibility})
548 when visibility in @valid_visibilities do
549 from(
550 a in query,
551 where:
552 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
553 )
554 end
555
556 defp restrict_visibility(_query, %{visibility: visibility})
557 when visibility not in @valid_visibilities do
558 Logger.error("Could not restrict visibility to #{visibility}")
559 end
560
561 defp restrict_visibility(query, _visibility), do: query
562
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when is_list(visibility) do
565 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
566 from(
567 a in query,
568 where:
569 not fragment(
570 "activity_visibility(?, ?, ?) = ANY (?)",
571 a.actor,
572 a.recipients,
573 a.data,
574 ^visibility
575 )
576 )
577 else
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581 end
582
583 defp exclude_visibility(query, %{exclude_visibilities: visibility})
584 when visibility in @valid_visibilities do
585 from(
586 a in query,
587 where:
588 not fragment(
589 "activity_visibility(?, ?, ?) = ?",
590 a.actor,
591 a.recipients,
592 a.data,
593 ^visibility
594 )
595 )
596 end
597
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility not in [nil | @valid_visibilities] do
600 Logger.error("Could not exclude visibility to #{visibility}")
601 query
602 end
603
604 defp exclude_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
610 do: query
611
612 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
613 from(
614 a in query,
615 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
616 )
617 end
618
619 defp restrict_thread_visibility(query, _, _), do: query
620
621 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
622 params =
623 params
624 |> Map.put(:user, reading_user)
625 |> Map.put(:actor_id, user.ap_id)
626
627 %{
628 godmode: params[:godmode],
629 reading_user: reading_user
630 }
631 |> user_activities_recipients()
632 |> fetch_activities(params)
633 |> Enum.reverse()
634 end
635
636 def fetch_user_activities(user, reading_user, params \\ %{}) do
637 params =
638 params
639 |> Map.put(:type, ["Create", "Announce"])
640 |> Map.put(:user, reading_user)
641 |> Map.put(:actor_id, user.ap_id)
642 |> Map.put(:pinned_activity_ids, user.pinned_activities)
643
644 params =
645 if User.blocks?(reading_user, user) do
646 params
647 else
648 params
649 |> Map.put(:blocking_user, reading_user)
650 |> Map.put(:muting_user, reading_user)
651 end
652
653 %{
654 godmode: params[:godmode],
655 reading_user: reading_user
656 }
657 |> user_activities_recipients()
658 |> fetch_activities(params)
659 |> Enum.reverse()
660 end
661
662 def fetch_statuses(reading_user, params) do
663 params = Map.put(params, :type, ["Create", "Announce"])
664
665 %{
666 godmode: params[:godmode],
667 reading_user: reading_user
668 }
669 |> user_activities_recipients()
670 |> fetch_activities(params, :offset)
671 |> Enum.reverse()
672 end
673
674 defp user_activities_recipients(%{godmode: true}), do: []
675
676 defp user_activities_recipients(%{reading_user: reading_user}) do
677 if reading_user do
678 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
679 else
680 [Constants.as_public()]
681 end
682 end
683
684 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
686 end
687
688 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
689 from(
690 [activity, object] in query,
691 where:
692 fragment(
693 "?->>'type' != ? or ?->>'actor' != ?",
694 activity.data,
695 "Announce",
696 object.data,
697 ^actor
698 )
699 )
700 end
701
702 defp restrict_announce_object_actor(query, _), do: query
703
704 defp restrict_since(query, %{since_id: ""}), do: query
705
706 defp restrict_since(query, %{since_id: since_id}) do
707 from(activity in query, where: activity.id > ^since_id)
708 end
709
710 defp restrict_since(query, _), do: query
711
712 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
714 end
715
716 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
717 from(
718 [_activity, object] in query,
719 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
720 )
721 end
722
723 defp restrict_tag_reject(query, _), do: query
724
725 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
727 end
728
729 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
730 from(
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
733 )
734 end
735
736 defp restrict_tag_all(query, _), do: query
737
738 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
739 raise "Can't use the child object without preloading!"
740 end
741
742 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
743 from(
744 [_activity, object] in query,
745 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
746 )
747 end
748
749 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
750 from(
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
753 )
754 end
755
756 defp restrict_tag(query, _), do: query
757
758 defp restrict_recipients(query, [], _user), do: query
759
760 defp restrict_recipients(query, recipients, nil) do
761 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
762 end
763
764 defp restrict_recipients(query, recipients, user) do
765 from(
766 activity in query,
767 where: fragment("? && ?", ^recipients, activity.recipients),
768 or_where: activity.actor == ^user.ap_id
769 )
770 end
771
772 defp restrict_local(query, %{local_only: true}) do
773 from(activity in query, where: activity.local == true)
774 end
775
776 defp restrict_local(query, _), do: query
777
778 defp restrict_actor(query, %{actor_id: actor_id}) do
779 from(activity in query, where: activity.actor == ^actor_id)
780 end
781
782 defp restrict_actor(query, _), do: query
783
784 defp restrict_type(query, %{type: type}) when is_binary(type) do
785 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
786 end
787
788 defp restrict_type(query, %{type: type}) do
789 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
790 end
791
792 defp restrict_type(query, _), do: query
793
794 defp restrict_state(query, %{state: state}) do
795 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
796 end
797
798 defp restrict_state(query, _), do: query
799
800 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
801 from(
802 [_activity, object] in query,
803 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
804 )
805 end
806
807 defp restrict_favorited_by(query, _), do: query
808
809 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
810 raise "Can't use the child object without preloading!"
811 end
812
813 defp restrict_media(query, %{only_media: true}) do
814 from(
815 [activity, object] in query,
816 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
817 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
818 )
819 end
820
821 defp restrict_media(query, _), do: query
822
823 defp restrict_replies(query, %{exclude_replies: true}) do
824 from(
825 [_activity, object] in query,
826 where: fragment("?->>'inReplyTo' is null", object.data)
827 )
828 end
829
830 defp restrict_replies(query, %{
831 reply_filtering_user: user,
832 reply_visibility: "self"
833 }) do
834 from(
835 [activity, object] in query,
836 where:
837 fragment(
838 "?->>'inReplyTo' is null OR ? = ANY(?)",
839 object.data,
840 ^user.ap_id,
841 activity.recipients
842 )
843 )
844 end
845
846 defp restrict_replies(query, %{
847 reply_filtering_user: user,
848 reply_visibility: "following"
849 }) do
850 from(
851 [activity, object] in query,
852 where:
853 fragment(
854 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
855 object.data,
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
857 activity.recipients,
858 activity.actor,
859 activity.actor,
860 ^user.ap_id
861 )
862 )
863 end
864
865 defp restrict_replies(query, _), do: query
866
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
869 end
870
871 defp restrict_reblogs(query, _), do: query
872
873 defp restrict_muted(query, %{with_muted: true}), do: query
874
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
877
878 query =
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
882 )
883
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
886 else
887 query
888 end
889 end
890
891 defp restrict_muted(query, _), do: query
892
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
896
897 following_ap_ids = User.get_friends_ap_ids(user)
898
899 query =
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
901
902 from(
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
906 where:
907 fragment(
908 "recipients_contain_blocked_domains(?, ?) = false",
909 activity.recipients,
910 ^domain_blocks
911 ),
912 where:
913 fragment(
914 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
915 activity.data,
916 activity.data,
917 ^blocked_ap_ids
918 ),
919 where:
920 fragment(
921 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
922 activity.actor,
923 ^domain_blocks,
924 activity.actor,
925 ^following_ap_ids
926 ),
927 where:
928 fragment(
929 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
930 o.data,
931 ^domain_blocks,
932 o.data,
933 ^following_ap_ids
934 )
935 )
936 end
937
938 defp restrict_blocked(query, _), do: query
939
940 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
941 from(
942 activity in query,
943 where:
944 fragment(
945 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
946 activity.data,
947 ^[Constants.as_public()]
948 )
949 )
950 end
951
952 defp restrict_unlisted(query, _), do: query
953
954 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
955 from(activity in query, where: activity.id in ^ids)
956 end
957
958 defp restrict_pinned(query, _), do: query
959
960 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
961 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
962
963 from(
964 activity in query,
965 where:
966 fragment(
967 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
968 activity.data,
969 activity.actor,
970 ^muted_reblogs
971 )
972 )
973 end
974
975 defp restrict_muted_reblogs(query, _), do: query
976
977 defp restrict_instance(query, %{instance: instance}) do
978 users =
979 from(
980 u in User,
981 select: u.ap_id,
982 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
983 )
984 |> Repo.all()
985
986 from(activity in query, where: activity.actor in ^users)
987 end
988
989 defp restrict_instance(query, _), do: query
990
991 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
992
993 defp exclude_poll_votes(query, _) do
994 if has_named_binding?(query, :object) do
995 from([activity, object: o] in query,
996 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
997 )
998 else
999 query
1000 end
1001 end
1002
1003 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1004
1005 defp exclude_chat_messages(query, _) do
1006 if has_named_binding?(query, :object) do
1007 from([activity, object: o] in query,
1008 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1009 )
1010 else
1011 query
1012 end
1013 end
1014
1015 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1016
1017 defp exclude_invisible_actors(query, _opts) do
1018 invisible_ap_ids =
1019 User.Query.build(%{invisible: true, select: [:ap_id]})
1020 |> Repo.all()
1021 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1022
1023 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1024 end
1025
1026 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1027 from(activity in query, where: activity.id != ^id)
1028 end
1029
1030 defp exclude_id(query, _), do: query
1031
1032 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1033
1034 defp maybe_preload_objects(query, _) do
1035 query
1036 |> Activity.with_preloaded_object()
1037 end
1038
1039 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1040
1041 defp maybe_preload_bookmarks(query, opts) do
1042 query
1043 |> Activity.with_preloaded_bookmark(opts[:user])
1044 end
1045
1046 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1047 query
1048 |> Activity.with_preloaded_report_notes()
1049 end
1050
1051 defp maybe_preload_report_notes(query, _), do: query
1052
1053 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1054
1055 defp maybe_set_thread_muted_field(query, opts) do
1056 query
1057 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1058 end
1059
1060 defp maybe_order(query, %{order: :desc}) do
1061 query
1062 |> order_by(desc: :id)
1063 end
1064
1065 defp maybe_order(query, %{order: :asc}) do
1066 query
1067 |> order_by(asc: :id)
1068 end
1069
1070 defp maybe_order(query, _), do: query
1071
1072 defp fetch_activities_query_ap_ids_ops(opts) do
1073 source_user = opts[:muting_user]
1074 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1075
1076 ap_id_relationships =
1077 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1078 [:block | ap_id_relationships]
1079 else
1080 ap_id_relationships
1081 end
1082
1083 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1084
1085 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1086 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1087
1088 restrict_muted_reblogs_opts =
1089 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1090
1091 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1092 end
1093
1094 def fetch_activities_query(recipients, opts \\ %{}) do
1095 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1096 fetch_activities_query_ap_ids_ops(opts)
1097
1098 config = %{
1099 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1100 }
1101
1102 Activity
1103 |> maybe_preload_objects(opts)
1104 |> maybe_preload_bookmarks(opts)
1105 |> maybe_preload_report_notes(opts)
1106 |> maybe_set_thread_muted_field(opts)
1107 |> maybe_order(opts)
1108 |> restrict_recipients(recipients, opts[:user])
1109 |> restrict_replies(opts)
1110 |> restrict_tag(opts)
1111 |> restrict_tag_reject(opts)
1112 |> restrict_tag_all(opts)
1113 |> restrict_since(opts)
1114 |> restrict_local(opts)
1115 |> restrict_actor(opts)
1116 |> restrict_type(opts)
1117 |> restrict_state(opts)
1118 |> restrict_favorited_by(opts)
1119 |> restrict_blocked(restrict_blocked_opts)
1120 |> restrict_muted(restrict_muted_opts)
1121 |> restrict_media(opts)
1122 |> restrict_visibility(opts)
1123 |> restrict_thread_visibility(opts, config)
1124 |> restrict_reblogs(opts)
1125 |> restrict_pinned(opts)
1126 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1127 |> restrict_instance(opts)
1128 |> restrict_announce_object_actor(opts)
1129 |> Activity.restrict_deactivated_users()
1130 |> exclude_poll_votes(opts)
1131 |> exclude_chat_messages(opts)
1132 |> exclude_invisible_actors(opts)
1133 |> exclude_visibility(opts)
1134 end
1135
1136 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1137 list_memberships = Pleroma.List.memberships(opts[:user])
1138
1139 fetch_activities_query(recipients ++ list_memberships, opts)
1140 |> Pagination.fetch_paginated(opts, pagination)
1141 |> Enum.reverse()
1142 |> maybe_update_cc(list_memberships, opts[:user])
1143 end
1144
1145 @doc """
1146 Fetch favorites activities of user with order by sort adds to favorites
1147 """
1148 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1149 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1150 user.ap_id
1151 |> Activity.Queries.by_actor()
1152 |> Activity.Queries.by_type("Like")
1153 |> Activity.with_joined_object()
1154 |> Object.with_joined_activity()
1155 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1156 |> order_by([like, _, _], desc_nulls_last: like.id)
1157 |> Pagination.fetch_paginated(
1158 Map.merge(params, %{skip_order: true}),
1159 pagination
1160 )
1161 end
1162
1163 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1164 Enum.map(activities, fn
1165 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1166 if Enum.any?(bcc, &(&1 in list_memberships)) do
1167 update_in(activity.data["cc"], &[user_ap_id | &1])
1168 else
1169 activity
1170 end
1171
1172 activity ->
1173 activity
1174 end)
1175 end
1176
1177 defp maybe_update_cc(activities, _, _), do: activities
1178
1179 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1180 from(activity in query,
1181 where:
1182 fragment("? && ?", activity.recipients, ^recipients) or
1183 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1184 ^Constants.as_public() in activity.recipients)
1185 )
1186 end
1187
1188 def fetch_activities_bounded(
1189 recipients,
1190 recipients_with_public,
1191 opts \\ %{},
1192 pagination \\ :keyset
1193 ) do
1194 fetch_activities_query([], opts)
1195 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1196 |> Pagination.fetch_paginated(opts, pagination)
1197 |> Enum.reverse()
1198 end
1199
1200 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1201 def upload(file, opts \\ []) do
1202 with {:ok, data} <- Upload.store(file, opts) do
1203 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1204
1205 Repo.insert(%Object{data: obj_data})
1206 end
1207 end
1208
1209 @spec get_actor_url(any()) :: binary() | nil
1210 defp get_actor_url(url) when is_binary(url), do: url
1211 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1212
1213 defp get_actor_url(url) when is_list(url) do
1214 url
1215 |> List.first()
1216 |> get_actor_url()
1217 end
1218
1219 defp get_actor_url(_url), do: nil
1220
1221 defp object_to_user_data(data) do
1222 avatar =
1223 data["icon"]["url"] &&
1224 %{
1225 "type" => "Image",
1226 "url" => [%{"href" => data["icon"]["url"]}]
1227 }
1228
1229 banner =
1230 data["image"]["url"] &&
1231 %{
1232 "type" => "Image",
1233 "url" => [%{"href" => data["image"]["url"]}]
1234 }
1235
1236 fields =
1237 data
1238 |> Map.get("attachment", [])
1239 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1240 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1241
1242 emojis =
1243 data
1244 |> Map.get("tag", [])
1245 |> Enum.filter(fn
1246 %{"type" => "Emoji"} -> true
1247 _ -> false
1248 end)
1249 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1250 {String.trim(name, ":"), url}
1251 end)
1252
1253 locked = data["manuallyApprovesFollowers"] || false
1254 data = Transmogrifier.maybe_fix_user_object(data)
1255 discoverable = data["discoverable"] || false
1256 invisible = data["invisible"] || false
1257 actor_type = data["type"] || "Person"
1258
1259 public_key =
1260 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1261 data["publicKey"]["publicKeyPem"]
1262 else
1263 nil
1264 end
1265
1266 shared_inbox =
1267 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1268 data["endpoints"]["sharedInbox"]
1269 else
1270 nil
1271 end
1272
1273 user_data = %{
1274 ap_id: data["id"],
1275 uri: get_actor_url(data["url"]),
1276 ap_enabled: true,
1277 banner: banner,
1278 fields: fields,
1279 emoji: emojis,
1280 locked: locked,
1281 discoverable: discoverable,
1282 invisible: invisible,
1283 avatar: avatar,
1284 name: data["name"],
1285 follower_address: data["followers"],
1286 following_address: data["following"],
1287 bio: data["summary"],
1288 actor_type: actor_type,
1289 also_known_as: Map.get(data, "alsoKnownAs", []),
1290 public_key: public_key,
1291 inbox: data["inbox"],
1292 shared_inbox: shared_inbox
1293 }
1294
1295 # nickname can be nil because of virtual actors
1296 if data["preferredUsername"] do
1297 Map.put(
1298 user_data,
1299 :nickname,
1300 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1301 )
1302 else
1303 Map.put(user_data, :nickname, nil)
1304 end
1305 end
1306
1307 def fetch_follow_information_for_user(user) do
1308 with {:ok, following_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1310 {:ok, hide_follows} <- collection_private(following_data),
1311 {:ok, followers_data} <-
1312 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1313 {:ok, hide_followers} <- collection_private(followers_data) do
1314 {:ok,
1315 %{
1316 hide_follows: hide_follows,
1317 follower_count: normalize_counter(followers_data["totalItems"]),
1318 following_count: normalize_counter(following_data["totalItems"]),
1319 hide_followers: hide_followers
1320 }}
1321 else
1322 {:error, _} = e -> e
1323 e -> {:error, e}
1324 end
1325 end
1326
1327 defp normalize_counter(counter) when is_integer(counter), do: counter
1328 defp normalize_counter(_), do: 0
1329
1330 def maybe_update_follow_information(user_data) do
1331 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1332 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1333 {_, true} <-
1334 {:collections_available,
1335 !!(user_data[:following_address] && user_data[:follower_address])},
1336 {:ok, info} <-
1337 fetch_follow_information_for_user(user_data) do
1338 info = Map.merge(user_data[:info] || %{}, info)
1339
1340 user_data
1341 |> Map.put(:info, info)
1342 else
1343 {:user_type_check, false} ->
1344 user_data
1345
1346 {:collections_available, false} ->
1347 user_data
1348
1349 {:enabled, false} ->
1350 user_data
1351
1352 e ->
1353 Logger.error(
1354 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1355 )
1356
1357 user_data
1358 end
1359 end
1360
1361 defp collection_private(%{"first" => %{"type" => type}})
1362 when type in ["CollectionPage", "OrderedCollectionPage"],
1363 do: {:ok, false}
1364
1365 defp collection_private(%{"first" => first}) do
1366 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1367 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1368 {:ok, false}
1369 else
1370 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1371 {:error, _} = e -> e
1372 e -> {:error, e}
1373 end
1374 end
1375
1376 defp collection_private(_data), do: {:ok, true}
1377
1378 def user_data_from_user_object(data) do
1379 with {:ok, data} <- MRF.filter(data) do
1380 {:ok, object_to_user_data(data)}
1381 else
1382 e -> {:error, e}
1383 end
1384 end
1385
1386 def fetch_and_prepare_user_from_ap_id(ap_id) do
1387 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1388 {:ok, data} <- user_data_from_user_object(data) do
1389 {:ok, maybe_update_follow_information(data)}
1390 else
1391 {:error, "Object has been deleted" = e} ->
1392 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1393 {:error, e}
1394
1395 {:error, e} ->
1396 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1397 {:error, e}
1398 end
1399 end
1400
1401 def make_user_from_ap_id(ap_id) do
1402 user = User.get_cached_by_ap_id(ap_id)
1403
1404 if user && !User.ap_enabled?(user) do
1405 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1406 else
1407 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1408 if user do
1409 user
1410 |> User.remote_user_changeset(data)
1411 |> User.update_and_set_cache()
1412 else
1413 data
1414 |> User.remote_user_changeset()
1415 |> Repo.insert()
1416 |> User.set_cache()
1417 end
1418 end
1419 end
1420 end
1421
1422 def make_user_from_nickname(nickname) do
1423 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1424 make_user_from_ap_id(ap_id)
1425 else
1426 _e -> {:error, "No AP id in WebFinger"}
1427 end
1428 end
1429
1430 # filter out broken threads
1431 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1432 entire_thread_visible_for_user?(activity, user)
1433 end
1434
1435 # do post-processing on a specific activity
1436 def contain_activity(%Activity{} = activity, %User{} = user) do
1437 contain_broken_threads(activity, user)
1438 end
1439
1440 def fetch_direct_messages_query do
1441 Activity
1442 |> restrict_type(%{type: "Create"})
1443 |> restrict_visibility(%{visibility: "direct"})
1444 |> order_by([activity], asc: activity.id)
1445 end
1446 end