ActivityPub: Remove `update` and switch to pipeline.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
89 "type" => "Create",
90 "actor" => actor
91 }) do
92 Object.increase_vote_count(reply_ap_id, name, actor)
93 end
94
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
96
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
101 {:ok, object, meta}
102 end
103 end
104
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
108 {:ok, activity} <-
109 Repo.insert(%Activity{
110 data: object,
111 local: local,
112 recipients: recipients,
113 actor: object["actor"]
114 }) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 %Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 }
137 |> Repo.insert()
138 |> maybe_create_activity_expiration()
139
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
142
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
144
145 {:ok, activity}
146 else
147 %Activity{} = activity ->
148 {:ok, activity}
149
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
152 data: map,
153 local: local,
154 actor: map["actor"],
155 recipients: recipients,
156 id: "pleroma:fakeid"
157 }
158
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:ok, activity}
161
162 error ->
163 {:error, error}
164 end
165 end
166
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
169
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
172 stream_out(activity)
173 stream_out_participations(participations)
174 end
175
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
178 {:ok, activity}
179 end
180 end
181
182 defp maybe_create_activity_expiration(result), do: result
183
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
188 {:ok, conversation}
189 end
190 end
191
192 defp get_participations({:ok, conversation}) do
193 conversation
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
196 end
197
198 defp get_participations(_), do: []
199
200 def stream_out_participations(participations) do
201 participations =
202 participations
203 |> Repo.preload(:user)
204
205 Streamer.stream("participation", participations)
206 end
207
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
211
212 last_activity_id =
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
214 user: user,
215 blocking_user: user
216 })
217
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
220 end
221 end
222 end
223
224 def stream_out_participations(_, _), do: :noop
225
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
228 activity
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
231 end
232
233 def stream_out(_activity) do
234 :noop
235 end
236
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
240 result
241 end
242 end
243
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
250
251 create_data =
252 make_create_data(
253 %{to: to, actor: actor, published: published, context: context, object: object},
254 additional
255 )
256
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity}
266 else
267 {:quick_insert, true, activity} ->
268 {:ok, activity}
269
270 {:fake, true, activity} ->
271 {:ok, activity}
272
273 {:error, message} ->
274 Repo.rollback(message)
275 end
276 end
277
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
284
285 listen_data =
286 make_listen_data(
287 %{to: to, actor: actor, published: published, context: context, object: object},
288 additional
289 )
290
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
301 end
302
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
306 end
307
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
312
313 data =
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
316
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
329 result
330 end
331 end
332
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
336
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
340 {:ok, activity}
341 else
342 {:error, error} -> Repo.rollback(error)
343 end
344 end
345
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
351 result
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 nil -> nil
365 {:error, error} -> Repo.rollback(error)
366 end
367 end
368
369 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
370 {:ok, Activity.t()} | {:error, any()}
371 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
372 with {:ok, result} <-
373 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
374 result
375 end
376 end
377
378 defp do_block(blocker, blocked, activity_id, local) do
379 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
380
381 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
382 unfollow(blocker, blocked, nil, local)
383 end
384
385 block_data = make_block_data(blocker, blocked, activity_id)
386
387 with {:ok, activity} <- insert(block_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
390 {:ok, activity}
391 else
392 {:error, error} -> Repo.rollback(error)
393 end
394 end
395
396 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
397 def flag(
398 %{
399 actor: actor,
400 context: _context,
401 account: account,
402 statuses: statuses,
403 content: content
404 } = params
405 ) do
406 # only accept false as false value
407 local = !(params[:local] == false)
408 forward = !(params[:forward] == false)
409
410 additional = params[:additional] || %{}
411
412 additional =
413 if forward do
414 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
415 else
416 Map.merge(additional, %{"to" => [], "cc" => []})
417 end
418
419 with flag_data <- make_flag_data(params, additional),
420 {:ok, activity} <- insert(flag_data, local),
421 {:ok, stripped_activity} <- strip_report_status_data(activity),
422 _ <- notify_and_stream(activity),
423 :ok <- maybe_federate(stripped_activity) do
424 User.all_superusers()
425 |> Enum.filter(fn user -> not is_nil(user.email) end)
426 |> Enum.each(fn superuser ->
427 superuser
428 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
429 |> Pleroma.Emails.Mailer.deliver_async()
430 end)
431
432 {:ok, activity}
433 end
434 end
435
436 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
437 def move(%User{} = origin, %User{} = target, local \\ true) do
438 params = %{
439 "type" => "Move",
440 "actor" => origin.ap_id,
441 "object" => origin.ap_id,
442 "target" => target.ap_id
443 }
444
445 with true <- origin.ap_id in target.also_known_as,
446 {:ok, activity} <- insert(params, local),
447 _ <- notify_and_stream(activity) do
448 maybe_federate(activity)
449
450 BackgroundWorker.enqueue("move_following", %{
451 "origin_id" => origin.id,
452 "target_id" => target.id
453 })
454
455 {:ok, activity}
456 else
457 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
458 err -> err
459 end
460 end
461
462 def fetch_activities_for_context_query(context, opts) do
463 public = [Constants.as_public()]
464
465 recipients =
466 if opts[:user],
467 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
468 else: public
469
470 from(activity in Activity)
471 |> maybe_preload_objects(opts)
472 |> maybe_preload_bookmarks(opts)
473 |> maybe_set_thread_muted_field(opts)
474 |> restrict_blocked(opts)
475 |> restrict_recipients(recipients, opts[:user])
476 |> where(
477 [activity],
478 fragment(
479 "?->>'type' = ? and ?->>'context' = ?",
480 activity.data,
481 "Create",
482 activity.data,
483 ^context
484 )
485 )
486 |> exclude_poll_votes(opts)
487 |> exclude_id(opts)
488 |> order_by([activity], desc: activity.id)
489 end
490
491 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
492 def fetch_activities_for_context(context, opts \\ %{}) do
493 context
494 |> fetch_activities_for_context_query(opts)
495 |> Repo.all()
496 end
497
498 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
499 FlakeId.Ecto.CompatType.t() | nil
500 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
501 context
502 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
503 |> restrict_visibility(%{visibility: "direct"})
504 |> limit(1)
505 |> select([a], a.id)
506 |> Repo.one()
507 end
508
509 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
510 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
511 opts = Map.delete(opts, :user)
512
513 [Constants.as_public()]
514 |> fetch_activities_query(opts)
515 |> restrict_unlisted(opts)
516 |> Pagination.fetch_paginated(opts, pagination)
517 end
518
519 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
520 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
521 opts
522 |> Map.put(:restrict_unlisted, true)
523 |> fetch_public_or_unlisted_activities(pagination)
524 end
525
526 @valid_visibilities ~w[direct unlisted public private]
527
528 defp restrict_visibility(query, %{visibility: visibility})
529 when is_list(visibility) do
530 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
531 from(
532 a in query,
533 where:
534 fragment(
535 "activity_visibility(?, ?, ?) = ANY (?)",
536 a.actor,
537 a.recipients,
538 a.data,
539 ^visibility
540 )
541 )
542 else
543 Logger.error("Could not restrict visibility to #{visibility}")
544 end
545 end
546
547 defp restrict_visibility(query, %{visibility: visibility})
548 when visibility in @valid_visibilities do
549 from(
550 a in query,
551 where:
552 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
553 )
554 end
555
556 defp restrict_visibility(_query, %{visibility: visibility})
557 when visibility not in @valid_visibilities do
558 Logger.error("Could not restrict visibility to #{visibility}")
559 end
560
561 defp restrict_visibility(query, _visibility), do: query
562
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when is_list(visibility) do
565 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
566 from(
567 a in query,
568 where:
569 not fragment(
570 "activity_visibility(?, ?, ?) = ANY (?)",
571 a.actor,
572 a.recipients,
573 a.data,
574 ^visibility
575 )
576 )
577 else
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581 end
582
583 defp exclude_visibility(query, %{exclude_visibilities: visibility})
584 when visibility in @valid_visibilities do
585 from(
586 a in query,
587 where:
588 not fragment(
589 "activity_visibility(?, ?, ?) = ?",
590 a.actor,
591 a.recipients,
592 a.data,
593 ^visibility
594 )
595 )
596 end
597
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility not in [nil | @valid_visibilities] do
600 Logger.error("Could not exclude visibility to #{visibility}")
601 query
602 end
603
604 defp exclude_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
610 do: query
611
612 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
613 from(
614 a in query,
615 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
616 )
617 end
618
619 defp restrict_thread_visibility(query, _, _), do: query
620
621 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
622 params =
623 params
624 |> Map.put(:user, reading_user)
625 |> Map.put(:actor_id, user.ap_id)
626
627 %{
628 godmode: params[:godmode],
629 reading_user: reading_user
630 }
631 |> user_activities_recipients()
632 |> fetch_activities(params)
633 |> Enum.reverse()
634 end
635
636 def fetch_user_activities(user, reading_user, params \\ %{}) do
637 params =
638 params
639 |> Map.put(:type, ["Create", "Announce"])
640 |> Map.put(:user, reading_user)
641 |> Map.put(:actor_id, user.ap_id)
642 |> Map.put(:pinned_activity_ids, user.pinned_activities)
643
644 params =
645 if User.blocks?(reading_user, user) do
646 params
647 else
648 params
649 |> Map.put(:blocking_user, reading_user)
650 |> Map.put(:muting_user, reading_user)
651 end
652
653 %{
654 godmode: params[:godmode],
655 reading_user: reading_user
656 }
657 |> user_activities_recipients()
658 |> fetch_activities(params)
659 |> Enum.reverse()
660 end
661
662 def fetch_statuses(reading_user, params) do
663 params = Map.put(params, :type, ["Create", "Announce"])
664
665 %{
666 godmode: params[:godmode],
667 reading_user: reading_user
668 }
669 |> user_activities_recipients()
670 |> fetch_activities(params, :offset)
671 |> Enum.reverse()
672 end
673
674 defp user_activities_recipients(%{godmode: true}), do: []
675
676 defp user_activities_recipients(%{reading_user: reading_user}) do
677 if reading_user do
678 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
679 else
680 [Constants.as_public()]
681 end
682 end
683
684 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
686 end
687
688 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
689 from(
690 [activity, object] in query,
691 where:
692 fragment(
693 "?->>'type' != ? or ?->>'actor' != ?",
694 activity.data,
695 "Announce",
696 object.data,
697 ^actor
698 )
699 )
700 end
701
702 defp restrict_announce_object_actor(query, _), do: query
703
704 defp restrict_since(query, %{since_id: ""}), do: query
705
706 defp restrict_since(query, %{since_id: since_id}) do
707 from(activity in query, where: activity.id > ^since_id)
708 end
709
710 defp restrict_since(query, _), do: query
711
712 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
714 end
715
716 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
717 from(
718 [_activity, object] in query,
719 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
720 )
721 end
722
723 defp restrict_tag_reject(query, _), do: query
724
725 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
727 end
728
729 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
730 from(
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
733 )
734 end
735
736 defp restrict_tag_all(query, _), do: query
737
738 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
739 raise "Can't use the child object without preloading!"
740 end
741
742 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
743 from(
744 [_activity, object] in query,
745 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
746 )
747 end
748
749 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
750 from(
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
753 )
754 end
755
756 defp restrict_tag(query, _), do: query
757
758 defp restrict_recipients(query, [], _user), do: query
759
760 defp restrict_recipients(query, recipients, nil) do
761 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
762 end
763
764 defp restrict_recipients(query, recipients, user) do
765 from(
766 activity in query,
767 where: fragment("? && ?", ^recipients, activity.recipients),
768 or_where: activity.actor == ^user.ap_id
769 )
770 end
771
772 defp restrict_local(query, %{local_only: true}) do
773 from(activity in query, where: activity.local == true)
774 end
775
776 defp restrict_local(query, _), do: query
777
778 defp restrict_actor(query, %{actor_id: actor_id}) do
779 from(activity in query, where: activity.actor == ^actor_id)
780 end
781
782 defp restrict_actor(query, _), do: query
783
784 defp restrict_type(query, %{type: type}) when is_binary(type) do
785 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
786 end
787
788 defp restrict_type(query, %{type: type}) do
789 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
790 end
791
792 defp restrict_type(query, _), do: query
793
794 defp restrict_state(query, %{state: state}) do
795 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
796 end
797
798 defp restrict_state(query, _), do: query
799
800 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
801 from(
802 [_activity, object] in query,
803 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
804 )
805 end
806
807 defp restrict_favorited_by(query, _), do: query
808
809 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
810 raise "Can't use the child object without preloading!"
811 end
812
813 defp restrict_media(query, %{only_media: true}) do
814 from(
815 [_activity, object] in query,
816 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
817 )
818 end
819
820 defp restrict_media(query, _), do: query
821
822 defp restrict_replies(query, %{exclude_replies: true}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("?->>'inReplyTo' is null", object.data)
826 )
827 end
828
829 defp restrict_replies(query, %{
830 reply_filtering_user: user,
831 reply_visibility: "self"
832 }) do
833 from(
834 [activity, object] in query,
835 where:
836 fragment(
837 "?->>'inReplyTo' is null OR ? = ANY(?)",
838 object.data,
839 ^user.ap_id,
840 activity.recipients
841 )
842 )
843 end
844
845 defp restrict_replies(query, %{
846 reply_filtering_user: user,
847 reply_visibility: "following"
848 }) do
849 from(
850 [activity, object] in query,
851 where:
852 fragment(
853 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
854 object.data,
855 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
856 activity.recipients,
857 activity.actor,
858 activity.actor,
859 ^user.ap_id
860 )
861 )
862 end
863
864 defp restrict_replies(query, _), do: query
865
866 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
867 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
868 end
869
870 defp restrict_reblogs(query, _), do: query
871
872 defp restrict_muted(query, %{with_muted: true}), do: query
873
874 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
875 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
876
877 query =
878 from([activity] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
880 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
881 )
882
883 unless opts[:skip_preload] do
884 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
885 else
886 query
887 end
888 end
889
890 defp restrict_muted(query, _), do: query
891
892 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
893 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
894 domain_blocks = user.domain_blocks || []
895
896 following_ap_ids = User.get_friends_ap_ids(user)
897
898 query =
899 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
900
901 from(
902 [activity, object: o] in query,
903 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
904 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
905 where:
906 fragment(
907 "recipients_contain_blocked_domains(?, ?) = false",
908 activity.recipients,
909 ^domain_blocks
910 ),
911 where:
912 fragment(
913 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
914 activity.data,
915 activity.data,
916 ^blocked_ap_ids
917 ),
918 where:
919 fragment(
920 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
921 activity.actor,
922 ^domain_blocks,
923 activity.actor,
924 ^following_ap_ids
925 ),
926 where:
927 fragment(
928 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
929 o.data,
930 ^domain_blocks,
931 o.data,
932 ^following_ap_ids
933 )
934 )
935 end
936
937 defp restrict_blocked(query, _), do: query
938
939 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
940 from(
941 activity in query,
942 where:
943 fragment(
944 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
945 activity.data,
946 ^[Constants.as_public()]
947 )
948 )
949 end
950
951 defp restrict_unlisted(query, _), do: query
952
953 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
954 from(activity in query, where: activity.id in ^ids)
955 end
956
957 defp restrict_pinned(query, _), do: query
958
959 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
960 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
961
962 from(
963 activity in query,
964 where:
965 fragment(
966 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
967 activity.data,
968 activity.actor,
969 ^muted_reblogs
970 )
971 )
972 end
973
974 defp restrict_muted_reblogs(query, _), do: query
975
976 defp restrict_instance(query, %{instance: instance}) do
977 users =
978 from(
979 u in User,
980 select: u.ap_id,
981 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
982 )
983 |> Repo.all()
984
985 from(activity in query, where: activity.actor in ^users)
986 end
987
988 defp restrict_instance(query, _), do: query
989
990 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
991
992 defp exclude_poll_votes(query, _) do
993 if has_named_binding?(query, :object) do
994 from([activity, object: o] in query,
995 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
996 )
997 else
998 query
999 end
1000 end
1001
1002 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1003
1004 defp exclude_chat_messages(query, _) do
1005 if has_named_binding?(query, :object) do
1006 from([activity, object: o] in query,
1007 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1008 )
1009 else
1010 query
1011 end
1012 end
1013
1014 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1015
1016 defp exclude_invisible_actors(query, _opts) do
1017 invisible_ap_ids =
1018 User.Query.build(%{invisible: true, select: [:ap_id]})
1019 |> Repo.all()
1020 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1021
1022 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1023 end
1024
1025 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1027 end
1028
1029 defp exclude_id(query, _), do: query
1030
1031 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1032
1033 defp maybe_preload_objects(query, _) do
1034 query
1035 |> Activity.with_preloaded_object()
1036 end
1037
1038 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1039
1040 defp maybe_preload_bookmarks(query, opts) do
1041 query
1042 |> Activity.with_preloaded_bookmark(opts[:user])
1043 end
1044
1045 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1046 query
1047 |> Activity.with_preloaded_report_notes()
1048 end
1049
1050 defp maybe_preload_report_notes(query, _), do: query
1051
1052 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1053
1054 defp maybe_set_thread_muted_field(query, opts) do
1055 query
1056 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1057 end
1058
1059 defp maybe_order(query, %{order: :desc}) do
1060 query
1061 |> order_by(desc: :id)
1062 end
1063
1064 defp maybe_order(query, %{order: :asc}) do
1065 query
1066 |> order_by(asc: :id)
1067 end
1068
1069 defp maybe_order(query, _), do: query
1070
1071 defp fetch_activities_query_ap_ids_ops(opts) do
1072 source_user = opts[:muting_user]
1073 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1074
1075 ap_id_relationships =
1076 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1077 [:block | ap_id_relationships]
1078 else
1079 ap_id_relationships
1080 end
1081
1082 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1083
1084 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1085 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1086
1087 restrict_muted_reblogs_opts =
1088 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1089
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1091 end
1092
1093 def fetch_activities_query(recipients, opts \\ %{}) do
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1095 fetch_activities_query_ap_ids_ops(opts)
1096
1097 config = %{
1098 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1099 }
1100
1101 Activity
1102 |> maybe_preload_objects(opts)
1103 |> maybe_preload_bookmarks(opts)
1104 |> maybe_preload_report_notes(opts)
1105 |> maybe_set_thread_muted_field(opts)
1106 |> maybe_order(opts)
1107 |> restrict_recipients(recipients, opts[:user])
1108 |> restrict_replies(opts)
1109 |> restrict_tag(opts)
1110 |> restrict_tag_reject(opts)
1111 |> restrict_tag_all(opts)
1112 |> restrict_since(opts)
1113 |> restrict_local(opts)
1114 |> restrict_actor(opts)
1115 |> restrict_type(opts)
1116 |> restrict_state(opts)
1117 |> restrict_favorited_by(opts)
1118 |> restrict_blocked(restrict_blocked_opts)
1119 |> restrict_muted(restrict_muted_opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> restrict_announce_object_actor(opts)
1128 |> Activity.restrict_deactivated_users()
1129 |> exclude_poll_votes(opts)
1130 |> exclude_chat_messages(opts)
1131 |> exclude_invisible_actors(opts)
1132 |> exclude_visibility(opts)
1133 end
1134
1135 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1136 list_memberships = Pleroma.List.memberships(opts[:user])
1137
1138 fetch_activities_query(recipients ++ list_memberships, opts)
1139 |> Pagination.fetch_paginated(opts, pagination)
1140 |> Enum.reverse()
1141 |> maybe_update_cc(list_memberships, opts[:user])
1142 end
1143
1144 @doc """
1145 Fetch favorites activities of user with order by sort adds to favorites
1146 """
1147 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1148 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1149 user.ap_id
1150 |> Activity.Queries.by_actor()
1151 |> Activity.Queries.by_type("Like")
1152 |> Activity.with_joined_object()
1153 |> Object.with_joined_activity()
1154 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1155 |> order_by([like, _, _], desc_nulls_last: like.id)
1156 |> Pagination.fetch_paginated(
1157 Map.merge(params, %{skip_order: true}),
1158 pagination
1159 )
1160 end
1161
1162 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1163 Enum.map(activities, fn
1164 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1165 if Enum.any?(bcc, &(&1 in list_memberships)) do
1166 update_in(activity.data["cc"], &[user_ap_id | &1])
1167 else
1168 activity
1169 end
1170
1171 activity ->
1172 activity
1173 end)
1174 end
1175
1176 defp maybe_update_cc(activities, _, _), do: activities
1177
1178 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1179 from(activity in query,
1180 where:
1181 fragment("? && ?", activity.recipients, ^recipients) or
1182 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1183 ^Constants.as_public() in activity.recipients)
1184 )
1185 end
1186
1187 def fetch_activities_bounded(
1188 recipients,
1189 recipients_with_public,
1190 opts \\ %{},
1191 pagination \\ :keyset
1192 ) do
1193 fetch_activities_query([], opts)
1194 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1195 |> Pagination.fetch_paginated(opts, pagination)
1196 |> Enum.reverse()
1197 end
1198
1199 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1200 def upload(file, opts \\ []) do
1201 with {:ok, data} <- Upload.store(file, opts) do
1202 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1203
1204 Repo.insert(%Object{data: obj_data})
1205 end
1206 end
1207
1208 @spec get_actor_url(any()) :: binary() | nil
1209 defp get_actor_url(url) when is_binary(url), do: url
1210 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1211
1212 defp get_actor_url(url) when is_list(url) do
1213 url
1214 |> List.first()
1215 |> get_actor_url()
1216 end
1217
1218 defp get_actor_url(_url), do: nil
1219
1220 defp object_to_user_data(data) do
1221 avatar =
1222 data["icon"]["url"] &&
1223 %{
1224 "type" => "Image",
1225 "url" => [%{"href" => data["icon"]["url"]}]
1226 }
1227
1228 banner =
1229 data["image"]["url"] &&
1230 %{
1231 "type" => "Image",
1232 "url" => [%{"href" => data["image"]["url"]}]
1233 }
1234
1235 fields =
1236 data
1237 |> Map.get("attachment", [])
1238 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1239 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1240
1241 emojis =
1242 data
1243 |> Map.get("tag", [])
1244 |> Enum.filter(fn
1245 %{"type" => "Emoji"} -> true
1246 _ -> false
1247 end)
1248 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1249 {String.trim(name, ":"), url}
1250 end)
1251
1252 locked = data["manuallyApprovesFollowers"] || false
1253 data = Transmogrifier.maybe_fix_user_object(data)
1254 discoverable = data["discoverable"] || false
1255 invisible = data["invisible"] || false
1256 actor_type = data["type"] || "Person"
1257
1258 public_key =
1259 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1260 data["publicKey"]["publicKeyPem"]
1261 else
1262 nil
1263 end
1264
1265 shared_inbox =
1266 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1267 data["endpoints"]["sharedInbox"]
1268 else
1269 nil
1270 end
1271
1272 user_data = %{
1273 ap_id: data["id"],
1274 uri: get_actor_url(data["url"]),
1275 ap_enabled: true,
1276 banner: banner,
1277 fields: fields,
1278 emoji: emojis,
1279 locked: locked,
1280 discoverable: discoverable,
1281 invisible: invisible,
1282 avatar: avatar,
1283 name: data["name"],
1284 follower_address: data["followers"],
1285 following_address: data["following"],
1286 bio: data["summary"],
1287 actor_type: actor_type,
1288 also_known_as: Map.get(data, "alsoKnownAs", []),
1289 public_key: public_key,
1290 inbox: data["inbox"],
1291 shared_inbox: shared_inbox
1292 }
1293
1294 # nickname can be nil because of virtual actors
1295 if data["preferredUsername"] do
1296 Map.put(
1297 user_data,
1298 :nickname,
1299 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1300 )
1301 else
1302 Map.put(user_data, :nickname, nil)
1303 end
1304 end
1305
1306 def fetch_follow_information_for_user(user) do
1307 with {:ok, following_data} <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1309 {:ok, hide_follows} <- collection_private(following_data),
1310 {:ok, followers_data} <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1312 {:ok, hide_followers} <- collection_private(followers_data) do
1313 {:ok,
1314 %{
1315 hide_follows: hide_follows,
1316 follower_count: normalize_counter(followers_data["totalItems"]),
1317 following_count: normalize_counter(following_data["totalItems"]),
1318 hide_followers: hide_followers
1319 }}
1320 else
1321 {:error, _} = e -> e
1322 e -> {:error, e}
1323 end
1324 end
1325
1326 defp normalize_counter(counter) when is_integer(counter), do: counter
1327 defp normalize_counter(_), do: 0
1328
1329 def maybe_update_follow_information(user_data) do
1330 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1331 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1332 {_, true} <-
1333 {:collections_available,
1334 !!(user_data[:following_address] && user_data[:follower_address])},
1335 {:ok, info} <-
1336 fetch_follow_information_for_user(user_data) do
1337 info = Map.merge(user_data[:info] || %{}, info)
1338
1339 user_data
1340 |> Map.put(:info, info)
1341 else
1342 {:user_type_check, false} ->
1343 user_data
1344
1345 {:collections_available, false} ->
1346 user_data
1347
1348 {:enabled, false} ->
1349 user_data
1350
1351 e ->
1352 Logger.error(
1353 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1354 )
1355
1356 user_data
1357 end
1358 end
1359
1360 defp collection_private(%{"first" => %{"type" => type}})
1361 when type in ["CollectionPage", "OrderedCollectionPage"],
1362 do: {:ok, false}
1363
1364 defp collection_private(%{"first" => first}) do
1365 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1366 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1367 {:ok, false}
1368 else
1369 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1370 {:error, _} = e -> e
1371 e -> {:error, e}
1372 end
1373 end
1374
1375 defp collection_private(_data), do: {:ok, true}
1376
1377 def user_data_from_user_object(data) do
1378 with {:ok, data} <- MRF.filter(data) do
1379 {:ok, object_to_user_data(data)}
1380 else
1381 e -> {:error, e}
1382 end
1383 end
1384
1385 def fetch_and_prepare_user_from_ap_id(ap_id) do
1386 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1387 {:ok, data} <- user_data_from_user_object(data) do
1388 {:ok, maybe_update_follow_information(data)}
1389 else
1390 {:error, "Object has been deleted" = e} ->
1391 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1392 {:error, e}
1393
1394 {:error, e} ->
1395 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1396 {:error, e}
1397 end
1398 end
1399
1400 def make_user_from_ap_id(ap_id) do
1401 user = User.get_cached_by_ap_id(ap_id)
1402
1403 if user && !User.ap_enabled?(user) do
1404 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1405 else
1406 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1407 if user do
1408 user
1409 |> User.remote_user_changeset(data)
1410 |> User.update_and_set_cache()
1411 else
1412 data
1413 |> User.remote_user_changeset()
1414 |> Repo.insert()
1415 |> User.set_cache()
1416 end
1417 end
1418 end
1419 end
1420
1421 def make_user_from_nickname(nickname) do
1422 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1423 make_user_from_ap_id(ap_id)
1424 else
1425 _e -> {:error, "No AP id in WebFinger"}
1426 end
1427 end
1428
1429 # filter out broken threads
1430 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1431 entire_thread_visible_for_user?(activity, user)
1432 end
1433
1434 # do post-processing on a specific activity
1435 def contain_activity(%Activity{} = activity, %User{} = user) do
1436 contain_broken_threads(activity, user)
1437 end
1438
1439 def fetch_direct_messages_query do
1440 Activity
1441 |> restrict_type(%{type: "Create"})
1442 |> restrict_visibility(%{visibility: "direct"})
1443 |> order_by([activity], asc: activity.id)
1444 end
1445 end