Merge branch 'develop' into global-status-expiration
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
42
43 recipients =
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
46 nil -> true
47 user -> User.following?(user, actor)
48 end
49 end)
50
51 {recipients, to, cc}
52 end
53
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
60 {recipients, to, cc}
61 end
62
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
68 {recipients, to, cc}
69 end
70
71 defp check_actor_is_active(nil), do: true
72
73 defp check_actor_is_active(actor) when is_binary(actor) do
74 case User.get_cached_by_ap_id(actor) do
75 %User{deactivated: deactivated} -> not deactivated
76 _ -> false
77 end
78 end
79
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
83 end
84
85 defp check_remote_limit(_), do: true
86
87 defp increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
89 end
90
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
93 end
94
95 defp increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
97 "type" => "Create"
98 }) do
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
101 end
102 end
103
104 defp increase_replies_count_if_reply(_create_data), do: :noop
105
106 defp increase_poll_votes_if_vote(%{
107 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
108 "type" => "Create",
109 "actor" => actor
110 }) do
111 Object.increase_vote_count(reply_ap_id, name, actor)
112 end
113
114 defp increase_poll_votes_if_vote(_create_data), do: :noop
115
116 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
117 def persist(object, meta) do
118 with local <- Keyword.fetch!(meta, :local),
119 {recipients, _, _} <- get_recipients(object),
120 {:ok, activity} <-
121 Repo.insert(%Activity{
122 data: object,
123 local: local,
124 recipients: recipients,
125 actor: object["actor"]
126 }) do
127 {:ok, activity, meta}
128 end
129 end
130
131 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
132 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
133 with nil <- Activity.normalize(map),
134 map <- lazy_put_activity_defaults(map, fake),
135 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
136 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
137 {:ok, map} <- MRF.filter(map),
138 {recipients, _, _} = get_recipients(map),
139 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
140 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
141 {:ok, map, object} <- insert_full_object(map) do
142 {:ok, activity} =
143 %Activity{
144 data: map,
145 local: local,
146 actor: map["actor"],
147 recipients: recipients
148 }
149 |> Repo.insert()
150 |> maybe_create_activity_expiration()
151
152 # Splice in the child object if we have one.
153 activity = Maps.put_if_present(activity, :object, object)
154
155 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
156
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 def notify_and_stream(activity) do
180 Notification.create_notifications(activity)
181
182 conversation = create_or_bump_conversation(activity, activity.actor)
183 participations = get_participations(conversation)
184 stream_out(activity)
185 stream_out_participations(participations)
186 end
187
188 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
189 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
190 {:ok, activity}
191 end
192 end
193
194 defp maybe_create_activity_expiration(result), do: result
195
196 defp create_or_bump_conversation(activity, actor) do
197 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
198 %User{} = user <- User.get_cached_by_ap_id(actor) do
199 Participation.mark_as_read(user, conversation)
200 {:ok, conversation}
201 end
202 end
203
204 defp get_participations({:ok, conversation}) do
205 conversation
206 |> Repo.preload(:participations, force: true)
207 |> Map.get(:participations)
208 end
209
210 defp get_participations(_), do: []
211
212 def stream_out_participations(participations) do
213 participations =
214 participations
215 |> Repo.preload(:user)
216
217 Streamer.stream("participation", participations)
218 end
219
220 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
221 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
222 conversation = Repo.preload(conversation, :participations)
223
224 last_activity_id =
225 fetch_latest_activity_id_for_context(conversation.ap_id, %{
226 user: user,
227 blocking_user: user
228 })
229
230 if last_activity_id do
231 stream_out_participations(conversation.participations)
232 end
233 end
234 end
235
236 def stream_out_participations(_, _), do: :noop
237
238 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
239 when data_type in ["Create", "Announce", "Delete"] do
240 activity
241 |> Topics.get_activity_topics()
242 |> Streamer.stream(activity)
243 end
244
245 def stream_out(_activity) do
246 :noop
247 end
248
249 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
250 def create(params, fake \\ false) do
251 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
252 result
253 end
254 end
255
256 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
257 additional = params[:additional] || %{}
258 # only accept false as false value
259 local = !(params[:local] == false)
260 published = params[:published]
261 quick_insert? = Config.get([:env]) == :benchmark
262
263 create_data =
264 make_create_data(
265 %{to: to, actor: actor, published: published, context: context, object: object},
266 additional
267 )
268
269 with {:ok, activity} <- insert(create_data, local, fake),
270 {:fake, false, activity} <- {:fake, fake, activity},
271 _ <- increase_replies_count_if_reply(create_data),
272 _ <- increase_poll_votes_if_vote(create_data),
273 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
274 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
275 _ <- notify_and_stream(activity),
276 :ok <- maybe_federate(activity) do
277 {:ok, activity}
278 else
279 {:quick_insert, true, activity} ->
280 {:ok, activity}
281
282 {:fake, true, activity} ->
283 {:ok, activity}
284
285 {:error, message} ->
286 Repo.rollback(message)
287 end
288 end
289
290 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
291 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
292 additional = params[:additional] || %{}
293 # only accept false as false value
294 local = !(params[:local] == false)
295 published = params[:published]
296
297 listen_data =
298 make_listen_data(
299 %{to: to, actor: actor, published: published, context: context, object: object},
300 additional
301 )
302
303 with {:ok, activity} <- insert(listen_data, local),
304 _ <- notify_and_stream(activity),
305 :ok <- maybe_federate(activity) do
306 {:ok, activity}
307 end
308 end
309
310 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
311 def accept(params) do
312 accept_or_reject("Accept", params)
313 end
314
315 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
316 def reject(params) do
317 accept_or_reject("Reject", params)
318 end
319
320 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
321 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
322 local = Map.get(params, :local, true)
323 activity_id = Map.get(params, :activity_id, nil)
324
325 data =
326 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
327 |> Maps.put_if_present("id", activity_id)
328
329 with {:ok, activity} <- insert(data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 end
334 end
335
336 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
337 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
338 local = !(params[:local] == false)
339 activity_id = params[:activity_id]
340
341 data =
342 %{
343 "to" => to,
344 "cc" => cc,
345 "type" => "Update",
346 "actor" => actor,
347 "object" => object
348 }
349 |> Maps.put_if_present("id", activity_id)
350
351 with {:ok, activity} <- insert(data, local),
352 _ <- notify_and_stream(activity),
353 :ok <- maybe_federate(activity) do
354 {:ok, activity}
355 end
356 end
357
358 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
359 {:ok, Activity.t()} | {:error, any()}
360 def follow(follower, followed, activity_id \\ nil, local \\ true) do
361 with {:ok, result} <-
362 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
363 result
364 end
365 end
366
367 defp do_follow(follower, followed, activity_id, local) do
368 data = make_follow_data(follower, followed, activity_id)
369
370 with {:ok, activity} <- insert(data, local),
371 _ <- notify_and_stream(activity),
372 :ok <- maybe_federate(activity) do
373 {:ok, activity}
374 else
375 {:error, error} -> Repo.rollback(error)
376 end
377 end
378
379 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
380 {:ok, Activity.t()} | nil | {:error, any()}
381 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
382 with {:ok, result} <-
383 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
384 result
385 end
386 end
387
388 defp do_unfollow(follower, followed, activity_id, local) do
389 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
390 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
391 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
392 {:ok, activity} <- insert(unfollow_data, local),
393 _ <- notify_and_stream(activity),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity}
396 else
397 nil -> nil
398 {:error, error} -> Repo.rollback(error)
399 end
400 end
401
402 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
403 {:ok, Activity.t()} | {:error, any()}
404 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
405 with {:ok, result} <-
406 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
407 result
408 end
409 end
410
411 defp do_block(blocker, blocked, activity_id, local) do
412 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
413
414 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
415 unfollow(blocker, blocked, nil, local)
416 end
417
418 block_data = make_block_data(blocker, blocked, activity_id)
419
420 with {:ok, activity} <- insert(block_data, local),
421 _ <- notify_and_stream(activity),
422 :ok <- maybe_federate(activity) do
423 {:ok, activity}
424 else
425 {:error, error} -> Repo.rollback(error)
426 end
427 end
428
429 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
430 def flag(
431 %{
432 actor: actor,
433 context: _context,
434 account: account,
435 statuses: statuses,
436 content: content
437 } = params
438 ) do
439 # only accept false as false value
440 local = !(params[:local] == false)
441 forward = !(params[:forward] == false)
442
443 additional = params[:additional] || %{}
444
445 additional =
446 if forward do
447 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
448 else
449 Map.merge(additional, %{"to" => [], "cc" => []})
450 end
451
452 with flag_data <- make_flag_data(params, additional),
453 {:ok, activity} <- insert(flag_data, local),
454 {:ok, stripped_activity} <- strip_report_status_data(activity),
455 _ <- notify_and_stream(activity),
456 :ok <- maybe_federate(stripped_activity) do
457 User.all_superusers()
458 |> Enum.filter(fn user -> not is_nil(user.email) end)
459 |> Enum.each(fn superuser ->
460 superuser
461 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
462 |> Pleroma.Emails.Mailer.deliver_async()
463 end)
464
465 {:ok, activity}
466 end
467 end
468
469 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
470 def move(%User{} = origin, %User{} = target, local \\ true) do
471 params = %{
472 "type" => "Move",
473 "actor" => origin.ap_id,
474 "object" => origin.ap_id,
475 "target" => target.ap_id
476 }
477
478 with true <- origin.ap_id in target.also_known_as,
479 {:ok, activity} <- insert(params, local),
480 _ <- notify_and_stream(activity) do
481 maybe_federate(activity)
482
483 BackgroundWorker.enqueue("move_following", %{
484 "origin_id" => origin.id,
485 "target_id" => target.id
486 })
487
488 {:ok, activity}
489 else
490 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
491 err -> err
492 end
493 end
494
495 def fetch_activities_for_context_query(context, opts) do
496 public = [Constants.as_public()]
497
498 recipients =
499 if opts[:user],
500 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
501 else: public
502
503 from(activity in Activity)
504 |> maybe_preload_objects(opts)
505 |> maybe_preload_bookmarks(opts)
506 |> maybe_set_thread_muted_field(opts)
507 |> restrict_blocked(opts)
508 |> restrict_recipients(recipients, opts[:user])
509 |> where(
510 [activity],
511 fragment(
512 "?->>'type' = ? and ?->>'context' = ?",
513 activity.data,
514 "Create",
515 activity.data,
516 ^context
517 )
518 )
519 |> exclude_poll_votes(opts)
520 |> exclude_id(opts)
521 |> order_by([activity], desc: activity.id)
522 end
523
524 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
525 def fetch_activities_for_context(context, opts \\ %{}) do
526 context
527 |> fetch_activities_for_context_query(opts)
528 |> Repo.all()
529 end
530
531 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
532 FlakeId.Ecto.CompatType.t() | nil
533 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
534 context
535 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
536 |> limit(1)
537 |> select([a], a.id)
538 |> Repo.one()
539 end
540
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts = Map.delete(opts, :user)
544
545 [Constants.as_public()]
546 |> fetch_activities_query(opts)
547 |> restrict_unlisted(opts)
548 |> Pagination.fetch_paginated(opts, pagination)
549 end
550
551 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
552 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
553 opts
554 |> Map.put(:restrict_unlisted, true)
555 |> fetch_public_or_unlisted_activities(pagination)
556 end
557
558 @valid_visibilities ~w[direct unlisted public private]
559
560 defp restrict_visibility(query, %{visibility: visibility})
561 when is_list(visibility) do
562 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
563 from(
564 a in query,
565 where:
566 fragment(
567 "activity_visibility(?, ?, ?) = ANY (?)",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 else
575 Logger.error("Could not restrict visibility to #{visibility}")
576 end
577 end
578
579 defp restrict_visibility(query, %{visibility: visibility})
580 when visibility in @valid_visibilities do
581 from(
582 a in query,
583 where:
584 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
585 )
586 end
587
588 defp restrict_visibility(_query, %{visibility: visibility})
589 when visibility not in @valid_visibilities do
590 Logger.error("Could not restrict visibility to #{visibility}")
591 end
592
593 defp restrict_visibility(query, _visibility), do: query
594
595 defp exclude_visibility(query, %{exclude_visibilities: visibility})
596 when is_list(visibility) do
597 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
598 from(
599 a in query,
600 where:
601 not fragment(
602 "activity_visibility(?, ?, ?) = ANY (?)",
603 a.actor,
604 a.recipients,
605 a.data,
606 ^visibility
607 )
608 )
609 else
610 Logger.error("Could not exclude visibility to #{visibility}")
611 query
612 end
613 end
614
615 defp exclude_visibility(query, %{exclude_visibilities: visibility})
616 when visibility in @valid_visibilities do
617 from(
618 a in query,
619 where:
620 not fragment(
621 "activity_visibility(?, ?, ?) = ?",
622 a.actor,
623 a.recipients,
624 a.data,
625 ^visibility
626 )
627 )
628 end
629
630 defp exclude_visibility(query, %{exclude_visibilities: visibility})
631 when visibility not in [nil | @valid_visibilities] do
632 Logger.error("Could not exclude visibility to #{visibility}")
633 query
634 end
635
636 defp exclude_visibility(query, _visibility), do: query
637
638 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
639 do: query
640
641 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
642 do: query
643
644 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
645 from(
646 a in query,
647 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
648 )
649 end
650
651 defp restrict_thread_visibility(query, _, _), do: query
652
653 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
654 params =
655 params
656 |> Map.put(:user, reading_user)
657 |> Map.put(:actor_id, user.ap_id)
658
659 %{
660 godmode: params[:godmode],
661 reading_user: reading_user
662 }
663 |> user_activities_recipients()
664 |> fetch_activities(params)
665 |> Enum.reverse()
666 end
667
668 def fetch_user_activities(user, reading_user, params \\ %{}) do
669 params =
670 params
671 |> Map.put(:type, ["Create", "Announce"])
672 |> Map.put(:user, reading_user)
673 |> Map.put(:actor_id, user.ap_id)
674 |> Map.put(:pinned_activity_ids, user.pinned_activities)
675
676 params =
677 if User.blocks?(reading_user, user) do
678 params
679 else
680 params
681 |> Map.put(:blocking_user, reading_user)
682 |> Map.put(:muting_user, reading_user)
683 end
684
685 %{
686 godmode: params[:godmode],
687 reading_user: reading_user
688 }
689 |> user_activities_recipients()
690 |> fetch_activities(params)
691 |> Enum.reverse()
692 end
693
694 def fetch_statuses(reading_user, params) do
695 params = Map.put(params, :type, ["Create", "Announce"])
696
697 %{
698 godmode: params[:godmode],
699 reading_user: reading_user
700 }
701 |> user_activities_recipients()
702 |> fetch_activities(params, :offset)
703 |> Enum.reverse()
704 end
705
706 defp user_activities_recipients(%{godmode: true}), do: []
707
708 defp user_activities_recipients(%{reading_user: reading_user}) do
709 if reading_user do
710 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
711 else
712 [Constants.as_public()]
713 end
714 end
715
716 defp restrict_since(query, %{since_id: ""}), do: query
717
718 defp restrict_since(query, %{since_id: since_id}) do
719 from(activity in query, where: activity.id > ^since_id)
720 end
721
722 defp restrict_since(query, _), do: query
723
724 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
725 raise "Can't use the child object without preloading!"
726 end
727
728 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
729 from(
730 [_activity, object] in query,
731 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
732 )
733 end
734
735 defp restrict_tag_reject(query, _), do: query
736
737 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
738 raise "Can't use the child object without preloading!"
739 end
740
741 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
742 from(
743 [_activity, object] in query,
744 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
745 )
746 end
747
748 defp restrict_tag_all(query, _), do: query
749
750 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
751 raise "Can't use the child object without preloading!"
752 end
753
754 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
755 from(
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
758 )
759 end
760
761 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
762 from(
763 [_activity, object] in query,
764 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
765 )
766 end
767
768 defp restrict_tag(query, _), do: query
769
770 defp restrict_recipients(query, [], _user), do: query
771
772 defp restrict_recipients(query, recipients, nil) do
773 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
774 end
775
776 defp restrict_recipients(query, recipients, user) do
777 from(
778 activity in query,
779 where: fragment("? && ?", ^recipients, activity.recipients),
780 or_where: activity.actor == ^user.ap_id
781 )
782 end
783
784 defp restrict_local(query, %{local_only: true}) do
785 from(activity in query, where: activity.local == true)
786 end
787
788 defp restrict_local(query, _), do: query
789
790 defp restrict_actor(query, %{actor_id: actor_id}) do
791 from(activity in query, where: activity.actor == ^actor_id)
792 end
793
794 defp restrict_actor(query, _), do: query
795
796 defp restrict_type(query, %{type: type}) when is_binary(type) do
797 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
798 end
799
800 defp restrict_type(query, %{type: type}) do
801 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
802 end
803
804 defp restrict_type(query, _), do: query
805
806 defp restrict_state(query, %{state: state}) do
807 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
808 end
809
810 defp restrict_state(query, _), do: query
811
812 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
813 from(
814 [_activity, object] in query,
815 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
816 )
817 end
818
819 defp restrict_favorited_by(query, _), do: query
820
821 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
822 raise "Can't use the child object without preloading!"
823 end
824
825 defp restrict_media(query, %{only_media: true}) do
826 from(
827 [_activity, object] in query,
828 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
829 )
830 end
831
832 defp restrict_media(query, _), do: query
833
834 defp restrict_replies(query, %{exclude_replies: true}) do
835 from(
836 [_activity, object] in query,
837 where: fragment("?->>'inReplyTo' is null", object.data)
838 )
839 end
840
841 defp restrict_replies(query, %{
842 reply_filtering_user: user,
843 reply_visibility: "self"
844 }) do
845 from(
846 [activity, object] in query,
847 where:
848 fragment(
849 "?->>'inReplyTo' is null OR ? = ANY(?)",
850 object.data,
851 ^user.ap_id,
852 activity.recipients
853 )
854 )
855 end
856
857 defp restrict_replies(query, %{
858 reply_filtering_user: user,
859 reply_visibility: "following"
860 }) do
861 from(
862 [activity, object] in query,
863 where:
864 fragment(
865 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
866 object.data,
867 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
868 activity.recipients,
869 activity.actor,
870 activity.actor,
871 ^user.ap_id
872 )
873 )
874 end
875
876 defp restrict_replies(query, _), do: query
877
878 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
879 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
880 end
881
882 defp restrict_reblogs(query, _), do: query
883
884 defp restrict_muted(query, %{with_muted: true}), do: query
885
886 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
887 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
888
889 query =
890 from([activity] in query,
891 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
892 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
893 )
894
895 unless opts[:skip_preload] do
896 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
897 else
898 query
899 end
900 end
901
902 defp restrict_muted(query, _), do: query
903
904 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
905 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
906 domain_blocks = user.domain_blocks || []
907
908 following_ap_ids = User.get_friends_ap_ids(user)
909
910 query =
911 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
912
913 from(
914 [activity, object: o] in query,
915 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
916 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
917 where:
918 fragment(
919 "recipients_contain_blocked_domains(?, ?) = false",
920 activity.recipients,
921 ^domain_blocks
922 ),
923 where:
924 fragment(
925 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
926 activity.data,
927 activity.data,
928 ^blocked_ap_ids
929 ),
930 where:
931 fragment(
932 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
933 activity.actor,
934 ^domain_blocks,
935 activity.actor,
936 ^following_ap_ids
937 ),
938 where:
939 fragment(
940 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
941 o.data,
942 ^domain_blocks,
943 o.data,
944 ^following_ap_ids
945 )
946 )
947 end
948
949 defp restrict_blocked(query, _), do: query
950
951 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
952 from(
953 activity in query,
954 where:
955 fragment(
956 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
957 activity.data,
958 ^[Constants.as_public()]
959 )
960 )
961 end
962
963 defp restrict_unlisted(query, _), do: query
964
965 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
966 from(activity in query, where: activity.id in ^ids)
967 end
968
969 defp restrict_pinned(query, _), do: query
970
971 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
972 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
973
974 from(
975 activity in query,
976 where:
977 fragment(
978 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
979 activity.data,
980 activity.actor,
981 ^muted_reblogs
982 )
983 )
984 end
985
986 defp restrict_muted_reblogs(query, _), do: query
987
988 defp restrict_instance(query, %{instance: instance}) do
989 users =
990 from(
991 u in User,
992 select: u.ap_id,
993 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
994 )
995 |> Repo.all()
996
997 from(activity in query, where: activity.actor in ^users)
998 end
999
1000 defp restrict_instance(query, _), do: query
1001
1002 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1003
1004 defp exclude_poll_votes(query, _) do
1005 if has_named_binding?(query, :object) do
1006 from([activity, object: o] in query,
1007 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1008 )
1009 else
1010 query
1011 end
1012 end
1013
1014 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1015
1016 defp exclude_invisible_actors(query, _opts) do
1017 invisible_ap_ids =
1018 User.Query.build(%{invisible: true, select: [:ap_id]})
1019 |> Repo.all()
1020 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1021
1022 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1023 end
1024
1025 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1027 end
1028
1029 defp exclude_id(query, _), do: query
1030
1031 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1032
1033 defp maybe_preload_objects(query, _) do
1034 query
1035 |> Activity.with_preloaded_object()
1036 end
1037
1038 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1039
1040 defp maybe_preload_bookmarks(query, opts) do
1041 query
1042 |> Activity.with_preloaded_bookmark(opts[:user])
1043 end
1044
1045 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1046 query
1047 |> Activity.with_preloaded_report_notes()
1048 end
1049
1050 defp maybe_preload_report_notes(query, _), do: query
1051
1052 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1053
1054 defp maybe_set_thread_muted_field(query, opts) do
1055 query
1056 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1057 end
1058
1059 defp maybe_order(query, %{order: :desc}) do
1060 query
1061 |> order_by(desc: :id)
1062 end
1063
1064 defp maybe_order(query, %{order: :asc}) do
1065 query
1066 |> order_by(asc: :id)
1067 end
1068
1069 defp maybe_order(query, _), do: query
1070
1071 defp fetch_activities_query_ap_ids_ops(opts) do
1072 source_user = opts[:muting_user]
1073 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1074
1075 ap_id_relationships =
1076 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1077 [:block | ap_id_relationships]
1078 else
1079 ap_id_relationships
1080 end
1081
1082 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1083
1084 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1085 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1086
1087 restrict_muted_reblogs_opts =
1088 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1089
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1091 end
1092
1093 def fetch_activities_query(recipients, opts \\ %{}) do
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1095 fetch_activities_query_ap_ids_ops(opts)
1096
1097 config = %{
1098 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1099 }
1100
1101 Activity
1102 |> maybe_preload_objects(opts)
1103 |> maybe_preload_bookmarks(opts)
1104 |> maybe_preload_report_notes(opts)
1105 |> maybe_set_thread_muted_field(opts)
1106 |> maybe_order(opts)
1107 |> restrict_recipients(recipients, opts[:user])
1108 |> restrict_replies(opts)
1109 |> restrict_tag(opts)
1110 |> restrict_tag_reject(opts)
1111 |> restrict_tag_all(opts)
1112 |> restrict_since(opts)
1113 |> restrict_local(opts)
1114 |> restrict_actor(opts)
1115 |> restrict_type(opts)
1116 |> restrict_state(opts)
1117 |> restrict_favorited_by(opts)
1118 |> restrict_blocked(restrict_blocked_opts)
1119 |> restrict_muted(restrict_muted_opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> Activity.restrict_deactivated_users()
1128 |> exclude_poll_votes(opts)
1129 |> exclude_invisible_actors(opts)
1130 |> exclude_visibility(opts)
1131 end
1132
1133 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1134 list_memberships = Pleroma.List.memberships(opts[:user])
1135
1136 fetch_activities_query(recipients ++ list_memberships, opts)
1137 |> Pagination.fetch_paginated(opts, pagination)
1138 |> Enum.reverse()
1139 |> maybe_update_cc(list_memberships, opts[:user])
1140 end
1141
1142 @doc """
1143 Fetch favorites activities of user with order by sort adds to favorites
1144 """
1145 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1146 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1147 user.ap_id
1148 |> Activity.Queries.by_actor()
1149 |> Activity.Queries.by_type("Like")
1150 |> Activity.with_joined_object()
1151 |> Object.with_joined_activity()
1152 |> select([_like, object, activity], %{activity | object: object})
1153 |> order_by([like, _, _], desc_nulls_last: like.id)
1154 |> Pagination.fetch_paginated(
1155 Map.merge(params, %{skip_order: true}),
1156 pagination,
1157 :object_activity
1158 )
1159 end
1160
1161 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1162 Enum.map(activities, fn
1163 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1164 if Enum.any?(bcc, &(&1 in list_memberships)) do
1165 update_in(activity.data["cc"], &[user_ap_id | &1])
1166 else
1167 activity
1168 end
1169
1170 activity ->
1171 activity
1172 end)
1173 end
1174
1175 defp maybe_update_cc(activities, _, _), do: activities
1176
1177 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1178 from(activity in query,
1179 where:
1180 fragment("? && ?", activity.recipients, ^recipients) or
1181 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1182 ^Constants.as_public() in activity.recipients)
1183 )
1184 end
1185
1186 def fetch_activities_bounded(
1187 recipients,
1188 recipients_with_public,
1189 opts \\ %{},
1190 pagination \\ :keyset
1191 ) do
1192 fetch_activities_query([], opts)
1193 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1194 |> Pagination.fetch_paginated(opts, pagination)
1195 |> Enum.reverse()
1196 end
1197
1198 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1199 def upload(file, opts \\ []) do
1200 with {:ok, data} <- Upload.store(file, opts) do
1201 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1202
1203 Repo.insert(%Object{data: obj_data})
1204 end
1205 end
1206
1207 @spec get_actor_url(any()) :: binary() | nil
1208 defp get_actor_url(url) when is_binary(url), do: url
1209 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1210
1211 defp get_actor_url(url) when is_list(url) do
1212 url
1213 |> List.first()
1214 |> get_actor_url()
1215 end
1216
1217 defp get_actor_url(_url), do: nil
1218
1219 defp object_to_user_data(data) do
1220 avatar =
1221 data["icon"]["url"] &&
1222 %{
1223 "type" => "Image",
1224 "url" => [%{"href" => data["icon"]["url"]}]
1225 }
1226
1227 banner =
1228 data["image"]["url"] &&
1229 %{
1230 "type" => "Image",
1231 "url" => [%{"href" => data["image"]["url"]}]
1232 }
1233
1234 fields =
1235 data
1236 |> Map.get("attachment", [])
1237 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1238 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1239
1240 emojis =
1241 data
1242 |> Map.get("tag", [])
1243 |> Enum.filter(fn
1244 %{"type" => "Emoji"} -> true
1245 _ -> false
1246 end)
1247 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1248 {String.trim(name, ":"), url}
1249 end)
1250
1251 locked = data["manuallyApprovesFollowers"] || false
1252 data = Transmogrifier.maybe_fix_user_object(data)
1253 discoverable = data["discoverable"] || false
1254 invisible = data["invisible"] || false
1255 actor_type = data["type"] || "Person"
1256
1257 public_key =
1258 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1259 data["publicKey"]["publicKeyPem"]
1260 else
1261 nil
1262 end
1263
1264 shared_inbox =
1265 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1266 data["endpoints"]["sharedInbox"]
1267 else
1268 nil
1269 end
1270
1271 user_data = %{
1272 ap_id: data["id"],
1273 uri: get_actor_url(data["url"]),
1274 ap_enabled: true,
1275 banner: banner,
1276 fields: fields,
1277 emoji: emojis,
1278 locked: locked,
1279 discoverable: discoverable,
1280 invisible: invisible,
1281 avatar: avatar,
1282 name: data["name"],
1283 follower_address: data["followers"],
1284 following_address: data["following"],
1285 bio: data["summary"],
1286 actor_type: actor_type,
1287 also_known_as: Map.get(data, "alsoKnownAs", []),
1288 public_key: public_key,
1289 inbox: data["inbox"],
1290 shared_inbox: shared_inbox
1291 }
1292
1293 # nickname can be nil because of virtual actors
1294 if data["preferredUsername"] do
1295 Map.put(
1296 user_data,
1297 :nickname,
1298 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1299 )
1300 else
1301 Map.put(user_data, :nickname, nil)
1302 end
1303 end
1304
1305 def fetch_follow_information_for_user(user) do
1306 with {:ok, following_data} <-
1307 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1308 {:ok, hide_follows} <- collection_private(following_data),
1309 {:ok, followers_data} <-
1310 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1311 {:ok, hide_followers} <- collection_private(followers_data) do
1312 {:ok,
1313 %{
1314 hide_follows: hide_follows,
1315 follower_count: normalize_counter(followers_data["totalItems"]),
1316 following_count: normalize_counter(following_data["totalItems"]),
1317 hide_followers: hide_followers
1318 }}
1319 else
1320 {:error, _} = e -> e
1321 e -> {:error, e}
1322 end
1323 end
1324
1325 defp normalize_counter(counter) when is_integer(counter), do: counter
1326 defp normalize_counter(_), do: 0
1327
1328 def maybe_update_follow_information(user_data) do
1329 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1330 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1331 {_, true} <-
1332 {:collections_available,
1333 !!(user_data[:following_address] && user_data[:follower_address])},
1334 {:ok, info} <-
1335 fetch_follow_information_for_user(user_data) do
1336 info = Map.merge(user_data[:info] || %{}, info)
1337
1338 user_data
1339 |> Map.put(:info, info)
1340 else
1341 {:user_type_check, false} ->
1342 user_data
1343
1344 {:collections_available, false} ->
1345 user_data
1346
1347 {:enabled, false} ->
1348 user_data
1349
1350 e ->
1351 Logger.error(
1352 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1353 )
1354
1355 user_data
1356 end
1357 end
1358
1359 defp collection_private(%{"first" => %{"type" => type}})
1360 when type in ["CollectionPage", "OrderedCollectionPage"],
1361 do: {:ok, false}
1362
1363 defp collection_private(%{"first" => first}) do
1364 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1365 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1366 {:ok, false}
1367 else
1368 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1369 {:error, _} = e -> e
1370 e -> {:error, e}
1371 end
1372 end
1373
1374 defp collection_private(_data), do: {:ok, true}
1375
1376 def user_data_from_user_object(data) do
1377 with {:ok, data} <- MRF.filter(data) do
1378 {:ok, object_to_user_data(data)}
1379 else
1380 e -> {:error, e}
1381 end
1382 end
1383
1384 def fetch_and_prepare_user_from_ap_id(ap_id) do
1385 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1386 {:ok, data} <- user_data_from_user_object(data) do
1387 {:ok, maybe_update_follow_information(data)}
1388 else
1389 {:error, "Object has been deleted" = e} ->
1390 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1391 {:error, e}
1392
1393 {:error, e} ->
1394 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1395 {:error, e}
1396 end
1397 end
1398
1399 def make_user_from_ap_id(ap_id) do
1400 user = User.get_cached_by_ap_id(ap_id)
1401
1402 if user && !User.ap_enabled?(user) do
1403 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1404 else
1405 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1406 if user do
1407 user
1408 |> User.remote_user_changeset(data)
1409 |> User.update_and_set_cache()
1410 else
1411 data
1412 |> User.remote_user_changeset()
1413 |> Repo.insert()
1414 |> User.set_cache()
1415 end
1416 end
1417 end
1418 end
1419
1420 def make_user_from_nickname(nickname) do
1421 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1422 make_user_from_ap_id(ap_id)
1423 else
1424 _e -> {:error, "No AP id in WebFinger"}
1425 end
1426 end
1427
1428 # filter out broken threads
1429 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1430 entire_thread_visible_for_user?(activity, user)
1431 end
1432
1433 # do post-processing on a specific activity
1434 def contain_activity(%Activity{} = activity, %User{} = user) do
1435 contain_broken_threads(activity, user)
1436 end
1437
1438 def fetch_direct_messages_query do
1439 Activity
1440 |> restrict_type(%{type: "Create"})
1441 |> restrict_visibility(%{visibility: "direct"})
1442 |> order_by([activity], asc: activity.id)
1443 end
1444 end