8f669acb9092dd93ac21d57ed0971343d3fdcf62
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26 require Pleroma.Constants
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = Map.get(data, "to", [])
32 cc = Map.get(data, "cc", [])
33 bcc = Map.get(data, "bcc", [])
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil -> true
40 user -> User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 actor = Map.get(data, "actor", [])
52 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = Map.get(data, "to", [])
58 cc = Map.get(data, "cc", [])
59 bcc = Map.get(data, "bcc", [])
60 recipients = Enum.concat([to, cc, bcc])
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" => %{"inReplyTo" => reply_ap_id} = object,
94 "type" => "Create"
95 }) do
96 if is_public?(object) do
97 Object.increase_replies_count(reply_ap_id)
98 end
99 end
100
101 def increase_replies_count_if_reply(_create_data), do: :noop
102
103 def decrease_replies_count_if_reply(%Object{
104 data: %{"inReplyTo" => reply_ap_id} = object
105 }) do
106 if is_public?(object) do
107 Object.decrease_replies_count(reply_ap_id)
108 end
109 end
110
111 def decrease_replies_count_if_reply(_object), do: :noop
112
113 def increase_poll_votes_if_vote(%{
114 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
115 "type" => "Create"
116 }) do
117 Object.increase_vote_count(reply_ap_id, name)
118 end
119
120 def increase_poll_votes_if_vote(_create_data), do: :noop
121
122 def insert(map, local \\ true, fake \\ false) when is_map(map) do
123 with nil <- Activity.normalize(map),
124 map <- lazy_put_activity_defaults(map, fake),
125 :ok <- check_actor_is_active(map["actor"]),
126 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
127 {:ok, map} <- MRF.filter(map),
128 {recipients, _, _} = get_recipients(map),
129 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
130 :ok <- Containment.contain_child(map),
131 {:ok, map, object} <- insert_full_object(map) do
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: map,
135 local: local,
136 actor: map["actor"],
137 recipients: recipients
138 })
139
140 # Splice in the child object if we have one.
141 activity =
142 if !is_nil(object) do
143 Map.put(activity, :object, object)
144 else
145 activity
146 end
147
148 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
149
150 Notification.create_notifications(activity)
151
152 participations =
153 activity
154 |> Conversation.create_or_bump_for()
155 |> get_participations()
156
157 stream_out(activity)
158 stream_out_participations(participations)
159 {:ok, activity}
160 else
161 %Activity{} = activity ->
162 {:ok, activity}
163
164 {:fake, true, map, recipients} ->
165 activity = %Activity{
166 data: map,
167 local: local,
168 actor: map["actor"],
169 recipients: recipients,
170 id: "pleroma:fakeid"
171 }
172
173 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
174 {:ok, activity}
175
176 error ->
177 {:error, error}
178 end
179 end
180
181 defp get_participations({:ok, %{participations: participations}}), do: participations
182 defp get_participations(_), do: []
183
184 def stream_out_participations(participations) do
185 participations =
186 participations
187 |> Repo.preload(:user)
188
189 Enum.each(participations, fn participation ->
190 Pleroma.Web.Streamer.stream("participation", participation)
191 end)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(activity) do
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if get_visibility(activity) == "public" do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 if get_visibility(activity) == "direct",
241 do: Pleroma.Web.Streamer.stream("direct", activity)
242 end
243 end
244 end
245 end
246
247 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
248 additional = params[:additional] || %{}
249 # only accept false as false value
250 local = !(params[:local] == false)
251 published = params[:published]
252
253 with create_data <-
254 make_create_data(
255 %{to: to, actor: actor, published: published, context: context, object: object},
256 additional
257 ),
258 {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 # Changing note count prior to enqueuing federation task in order to avoid
263 # race conditions on updating user.info
264 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:fake, true, activity} ->
269 {:ok, activity}
270
271 {:error, message} ->
272 {:error, message}
273 end
274 end
275
276 def accept(%{to: to, actor: actor, object: object} = params) do
277 # only accept false as false value
278 local = !(params[:local] == false)
279
280 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
281 {:ok, activity} <- insert(data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 end
285 end
286
287 def reject(%{to: to, actor: actor, object: object} = params) do
288 # only accept false as false value
289 local = !(params[:local] == false)
290
291 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 # only accept false as false value
300 local = !(params[:local] == false)
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 {:ok, activity} <- insert(data, local),
310 :ok <- maybe_federate(activity) do
311 {:ok, activity}
312 end
313 end
314
315 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
316 def like(
317 %User{ap_id: ap_id} = user,
318 %Object{data: %{"id" => _}} = object,
319 activity_id \\ nil,
320 local \\ true
321 ) do
322 with nil <- get_existing_like(ap_id, object),
323 like_data <- make_like_data(user, object, activity_id),
324 {:ok, activity} <- insert(like_data, local),
325 {:ok, object} <- add_like_to_object(activity, object),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity, object}
328 else
329 %Activity{} = activity -> {:ok, activity, object}
330 error -> {:error, error}
331 end
332 end
333
334 def unlike(
335 %User{} = actor,
336 %Object{} = object,
337 activity_id \\ nil,
338 local \\ true
339 ) do
340 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
341 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
342 {:ok, unlike_activity} <- insert(unlike_data, local),
343 {:ok, _activity} <- Repo.delete(like_activity),
344 {:ok, object} <- remove_like_from_object(like_activity, object),
345 :ok <- maybe_federate(unlike_activity) do
346 {:ok, unlike_activity, like_activity, object}
347 else
348 _e -> {:ok, object}
349 end
350 end
351
352 def announce(
353 %User{ap_id: _} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true,
357 public \\ true
358 ) do
359 with true <- is_public?(object),
360 announce_data <- make_announce_data(user, object, activity_id, public),
361 {:ok, activity} <- insert(announce_data, local),
362 {:ok, object} <- add_announce_to_object(activity, object),
363 :ok <- maybe_federate(activity) do
364 {:ok, activity, object}
365 else
366 error -> {:error, error}
367 end
368 end
369
370 def unannounce(
371 %User{} = actor,
372 %Object{} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
377 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
378 {:ok, unannounce_activity} <- insert(unannounce_data, local),
379 :ok <- maybe_federate(unannounce_activity),
380 {:ok, _activity} <- Repo.delete(announce_activity),
381 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
382 {:ok, unannounce_activity, object}
383 else
384 _e -> {:ok, object}
385 end
386 end
387
388 def follow(follower, followed, activity_id \\ nil, local \\ true) do
389 with data <- make_follow_data(follower, followed, activity_id),
390 {:ok, activity} <- insert(data, local),
391 :ok <- maybe_federate(activity) do
392 {:ok, activity}
393 end
394 end
395
396 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 :ok <- maybe_federate(activity) do
402 {:ok, activity}
403 end
404 end
405
406 def delete(data, opts \\ %{actor: nil, local: true})
407
408 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user, opts) do
409 with data <- %{
410 "to" => [follower_address],
411 "type" => "Delete",
412 "actor" => opts[:actor] || ap_id,
413 "object" => %{"type" => "Person", "id" => ap_id}
414 },
415 {:ok, activity} <- insert(data, true, true),
416 :ok <- maybe_federate(activity) do
417 {:ok, user}
418 end
419 end
420
421 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, opts) do
422 user = User.get_cached_by_ap_id(actor)
423 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
424
425 with {:ok, object, activity} <- Object.delete(object),
426 data <- %{
427 "type" => "Delete",
428 "actor" => actor,
429 "object" => id,
430 "to" => to,
431 "deleted_activity_id" => activity && activity.id
432 },
433 {:ok, activity} <- insert(data, opts[:local], false),
434 stream_out_participations(object, user),
435 _ <- decrease_replies_count_if_reply(object),
436 # Changing note count prior to enqueuing federation task in order to avoid
437 # race conditions on updating user.info
438 {:ok, _actor} <- decrease_note_count_if_public(user, object),
439 :ok <- maybe_federate(activity) do
440 {:ok, activity}
441 end
442 end
443
444 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
445 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
446 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
447
448 if unfollow_blocked do
449 follow_activity = fetch_latest_follow(blocker, blocked)
450 if follow_activity, do: unfollow(blocker, blocked, nil, local)
451 end
452
453 with true <- outgoing_blocks,
454 block_data <- make_block_data(blocker, blocked, activity_id),
455 {:ok, activity} <- insert(block_data, local),
456 :ok <- maybe_federate(activity) do
457 {:ok, activity}
458 else
459 _e -> {:ok, nil}
460 end
461 end
462
463 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
464 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
465 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
466 {:ok, activity} <- insert(unblock_data, local),
467 :ok <- maybe_federate(activity) do
468 {:ok, activity}
469 end
470 end
471
472 def flag(
473 %{
474 actor: actor,
475 context: context,
476 account: account,
477 statuses: statuses,
478 content: content
479 } = params
480 ) do
481 # only accept false as false value
482 local = !(params[:local] == false)
483 forward = !(params[:forward] == false)
484
485 additional = params[:additional] || %{}
486
487 params = %{
488 actor: actor,
489 context: context,
490 account: account,
491 statuses: statuses,
492 content: content
493 }
494
495 additional =
496 if forward do
497 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
498 else
499 Map.merge(additional, %{"to" => [], "cc" => []})
500 end
501
502 with flag_data <- make_flag_data(params, additional),
503 {:ok, activity} <- insert(flag_data, local),
504 :ok <- maybe_federate(activity) do
505 Enum.each(User.all_superusers(), fn superuser ->
506 superuser
507 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
508 |> Pleroma.Emails.Mailer.deliver_async()
509 end)
510
511 {:ok, activity}
512 end
513 end
514
515 defp fetch_activities_for_context_query(context, opts) do
516 public = [Pleroma.Constants.as_public()]
517
518 recipients =
519 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
520
521 from(activity in Activity)
522 |> maybe_preload_objects(opts)
523 |> restrict_blocked(opts)
524 |> restrict_recipients(recipients, opts["user"])
525 |> where(
526 [activity],
527 fragment(
528 "?->>'type' = ? and ?->>'context' = ?",
529 activity.data,
530 "Create",
531 activity.data,
532 ^context
533 )
534 )
535 |> exclude_poll_votes(opts)
536 |> order_by([activity], desc: activity.id)
537 end
538
539 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
540 def fetch_activities_for_context(context, opts \\ %{}) do
541 context
542 |> fetch_activities_for_context_query(opts)
543 |> Repo.all()
544 end
545
546 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
547 Pleroma.FlakeId.t() | nil
548 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
549 context
550 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
551 |> limit(1)
552 |> select([a], a.id)
553 |> Repo.one()
554 end
555
556 def fetch_public_activities(opts \\ %{}) do
557 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
558
559 q
560 |> restrict_unlisted()
561 |> Pagination.fetch_paginated(opts)
562 |> Enum.reverse()
563 end
564
565 @valid_visibilities ~w[direct unlisted public private]
566
567 defp restrict_visibility(query, %{visibility: visibility})
568 when is_list(visibility) do
569 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
570 query =
571 from(
572 a in query,
573 where:
574 fragment(
575 "activity_visibility(?, ?, ?) = ANY (?)",
576 a.actor,
577 a.recipients,
578 a.data,
579 ^visibility
580 )
581 )
582
583 query
584 else
585 Logger.error("Could not restrict visibility to #{visibility}")
586 end
587 end
588
589 defp restrict_visibility(query, %{visibility: visibility})
590 when visibility in @valid_visibilities do
591 from(
592 a in query,
593 where:
594 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
595 )
596 end
597
598 defp restrict_visibility(_query, %{visibility: visibility})
599 when visibility not in @valid_visibilities do
600 Logger.error("Could not restrict visibility to #{visibility}")
601 end
602
603 defp restrict_visibility(query, _visibility), do: query
604
605 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
606 do: query
607
608 defp restrict_thread_visibility(
609 query,
610 %{"user" => %User{info: %{skip_thread_containment: true}}},
611 _
612 ),
613 do: query
614
615 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
616 from(
617 a in query,
618 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
619 )
620 end
621
622 defp restrict_thread_visibility(query, _, _), do: query
623
624 def fetch_user_activities(user, reading_user, params \\ %{}) do
625 params =
626 params
627 |> Map.put("type", ["Create", "Announce"])
628 |> Map.put("actor_id", user.ap_id)
629 |> Map.put("whole_db", true)
630 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
631
632 recipients =
633 user_activities_recipients(%{
634 "godmode" => params["godmode"],
635 "reading_user" => reading_user
636 })
637
638 fetch_activities(recipients, params)
639 |> Enum.reverse()
640 end
641
642 defp user_activities_recipients(%{"godmode" => true}) do
643 []
644 end
645
646 defp user_activities_recipients(%{"reading_user" => reading_user}) do
647 if reading_user do
648 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
649 else
650 [Pleroma.Constants.as_public()]
651 end
652 end
653
654 defp restrict_since(query, %{"since_id" => ""}), do: query
655
656 defp restrict_since(query, %{"since_id" => since_id}) do
657 from(activity in query, where: activity.id > ^since_id)
658 end
659
660 defp restrict_since(query, _), do: query
661
662 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
663 raise "Can't use the child object without preloading!"
664 end
665
666 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
667 when is_list(tag_reject) and tag_reject != [] do
668 from(
669 [_activity, object] in query,
670 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
671 )
672 end
673
674 defp restrict_tag_reject(query, _), do: query
675
676 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
677 raise "Can't use the child object without preloading!"
678 end
679
680 defp restrict_tag_all(query, %{"tag_all" => tag_all})
681 when is_list(tag_all) and tag_all != [] do
682 from(
683 [_activity, object] in query,
684 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
685 )
686 end
687
688 defp restrict_tag_all(query, _), do: query
689
690 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
691 raise "Can't use the child object without preloading!"
692 end
693
694 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
695 from(
696 [_activity, object] in query,
697 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
698 )
699 end
700
701 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
702 from(
703 [_activity, object] in query,
704 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
705 )
706 end
707
708 defp restrict_tag(query, _), do: query
709
710 defp restrict_recipients(query, [], _user), do: query
711
712 defp restrict_recipients(query, recipients, nil) do
713 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
714 end
715
716 defp restrict_recipients(query, recipients, user) do
717 from(
718 activity in query,
719 where: fragment("? && ?", ^recipients, activity.recipients),
720 or_where: activity.actor == ^user.ap_id
721 )
722 end
723
724 defp restrict_local(query, %{"local_only" => true}) do
725 from(activity in query, where: activity.local == true)
726 end
727
728 defp restrict_local(query, _), do: query
729
730 defp restrict_actor(query, %{"actor_id" => actor_id}) do
731 from(activity in query, where: activity.actor == ^actor_id)
732 end
733
734 defp restrict_actor(query, _), do: query
735
736 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
737 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
738 end
739
740 defp restrict_type(query, %{"type" => type}) do
741 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
742 end
743
744 defp restrict_type(query, _), do: query
745
746 defp restrict_state(query, %{"state" => state}) do
747 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
748 end
749
750 defp restrict_state(query, _), do: query
751
752 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
753 from(
754 [_activity, object] in query,
755 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
756 )
757 end
758
759 defp restrict_favorited_by(query, _), do: query
760
761 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
762 raise "Can't use the child object without preloading!"
763 end
764
765 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
766 from(
767 [_activity, object] in query,
768 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
769 )
770 end
771
772 defp restrict_media(query, _), do: query
773
774 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
775 from(
776 activity in query,
777 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
778 )
779 end
780
781 defp restrict_replies(query, _), do: query
782
783 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
784 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
785 end
786
787 defp restrict_reblogs(query, _), do: query
788
789 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
790
791 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
792 mutes = info.mutes
793
794 from(
795 activity in query,
796 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
797 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
798 )
799 end
800
801 defp restrict_muted(query, _), do: query
802
803 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
804 blocks = info.blocks || []
805 domain_blocks = info.domain_blocks || []
806
807 query =
808 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
809
810 from(
811 [activity, object: o] in query,
812 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
813 where: fragment("not (? && ?)", activity.recipients, ^blocks),
814 where:
815 fragment(
816 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
817 activity.data,
818 activity.data,
819 ^blocks
820 ),
821 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
822 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
823 )
824 end
825
826 defp restrict_blocked(query, _), do: query
827
828 defp restrict_unlisted(query) do
829 from(
830 activity in query,
831 where:
832 fragment(
833 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
834 activity.data,
835 ^[Pleroma.Constants.as_public()]
836 )
837 )
838 end
839
840 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
841 from(activity in query, where: activity.id in ^ids)
842 end
843
844 defp restrict_pinned(query, _), do: query
845
846 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
847 muted_reblogs = info.muted_reblogs || []
848
849 from(
850 activity in query,
851 where:
852 fragment(
853 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
854 activity.data,
855 activity.actor,
856 ^muted_reblogs
857 )
858 )
859 end
860
861 defp restrict_muted_reblogs(query, _), do: query
862
863 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
864
865 defp exclude_poll_votes(query, _) do
866 if has_named_binding?(query, :object) do
867 from([activity, object: o] in query,
868 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
869 )
870 else
871 query
872 end
873 end
874
875 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
876
877 defp maybe_preload_objects(query, _) do
878 query
879 |> Activity.with_preloaded_object()
880 end
881
882 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
883
884 defp maybe_preload_bookmarks(query, opts) do
885 query
886 |> Activity.with_preloaded_bookmark(opts["user"])
887 end
888
889 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
890
891 defp maybe_set_thread_muted_field(query, opts) do
892 query
893 |> Activity.with_set_thread_muted_field(opts["user"])
894 end
895
896 defp maybe_order(query, %{order: :desc}) do
897 query
898 |> order_by(desc: :id)
899 end
900
901 defp maybe_order(query, %{order: :asc}) do
902 query
903 |> order_by(asc: :id)
904 end
905
906 defp maybe_order(query, _), do: query
907
908 def fetch_activities_query(recipients, opts \\ %{}) do
909 config = %{
910 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
911 }
912
913 Activity
914 |> maybe_preload_objects(opts)
915 |> maybe_preload_bookmarks(opts)
916 |> maybe_set_thread_muted_field(opts)
917 |> maybe_order(opts)
918 |> restrict_recipients(recipients, opts["user"])
919 |> restrict_tag(opts)
920 |> restrict_tag_reject(opts)
921 |> restrict_tag_all(opts)
922 |> restrict_since(opts)
923 |> restrict_local(opts)
924 |> restrict_actor(opts)
925 |> restrict_type(opts)
926 |> restrict_state(opts)
927 |> restrict_favorited_by(opts)
928 |> restrict_blocked(opts)
929 |> restrict_muted(opts)
930 |> restrict_media(opts)
931 |> restrict_visibility(opts)
932 |> restrict_thread_visibility(opts, config)
933 |> restrict_replies(opts)
934 |> restrict_reblogs(opts)
935 |> restrict_pinned(opts)
936 |> restrict_muted_reblogs(opts)
937 |> Activity.restrict_deactivated_users()
938 |> exclude_poll_votes(opts)
939 end
940
941 def fetch_activities(recipients, opts \\ %{}) do
942 list_memberships = Pleroma.List.memberships(opts["user"])
943
944 fetch_activities_query(recipients ++ list_memberships, opts)
945 |> Pagination.fetch_paginated(opts)
946 |> Enum.reverse()
947 |> maybe_update_cc(list_memberships, opts["user"])
948 end
949
950 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
951 when is_list(list_memberships) and length(list_memberships) > 0 do
952 Enum.map(activities, fn
953 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
954 if Enum.any?(bcc, &(&1 in list_memberships)) do
955 update_in(activity.data["cc"], &[user_ap_id | &1])
956 else
957 activity
958 end
959
960 activity ->
961 activity
962 end)
963 end
964
965 defp maybe_update_cc(activities, _, _), do: activities
966
967 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
968 from(activity in query,
969 where:
970 fragment("? && ?", activity.recipients, ^recipients) or
971 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
972 ^Pleroma.Constants.as_public() in activity.recipients)
973 )
974 end
975
976 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
977 fetch_activities_query([], opts)
978 |> fetch_activities_bounded_query(recipients, recipients_with_public)
979 |> Pagination.fetch_paginated(opts)
980 |> Enum.reverse()
981 end
982
983 def upload(file, opts \\ []) do
984 with {:ok, data} <- Upload.store(file, opts) do
985 obj_data =
986 if opts[:actor] do
987 Map.put(data, "actor", opts[:actor])
988 else
989 data
990 end
991
992 Repo.insert(%Object{data: obj_data})
993 end
994 end
995
996 defp object_to_user_data(data) do
997 avatar =
998 data["icon"]["url"] &&
999 %{
1000 "type" => "Image",
1001 "url" => [%{"href" => data["icon"]["url"]}]
1002 }
1003
1004 banner =
1005 data["image"]["url"] &&
1006 %{
1007 "type" => "Image",
1008 "url" => [%{"href" => data["image"]["url"]}]
1009 }
1010
1011 locked = data["manuallyApprovesFollowers"] || false
1012 data = Transmogrifier.maybe_fix_user_object(data)
1013
1014 user_data = %{
1015 ap_id: data["id"],
1016 info: %{
1017 ap_enabled: true,
1018 source_data: data,
1019 banner: banner,
1020 locked: locked
1021 },
1022 avatar: avatar,
1023 name: data["name"],
1024 follower_address: data["followers"],
1025 following_address: data["following"],
1026 bio: data["summary"]
1027 }
1028
1029 # nickname can be nil because of virtual actors
1030 user_data =
1031 if data["preferredUsername"] do
1032 Map.put(
1033 user_data,
1034 :nickname,
1035 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1036 )
1037 else
1038 Map.put(user_data, :nickname, nil)
1039 end
1040
1041 {:ok, user_data}
1042 end
1043
1044 def fetch_follow_information_for_user(user) do
1045 with {:ok, following_data} <-
1046 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1047 following_count when is_integer(following_count) <- following_data["totalItems"],
1048 {:ok, hide_follows} <- collection_private(following_data),
1049 {:ok, followers_data} <-
1050 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1051 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1052 {:ok, hide_followers} <- collection_private(followers_data) do
1053 {:ok,
1054 %{
1055 hide_follows: hide_follows,
1056 follower_count: followers_count,
1057 following_count: following_count,
1058 hide_followers: hide_followers
1059 }}
1060 else
1061 {:error, _} = e ->
1062 e
1063
1064 e ->
1065 {:error, e}
1066 end
1067 end
1068
1069 defp maybe_update_follow_information(data) do
1070 with {:enabled, true} <-
1071 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1072 {:ok, info} <- fetch_follow_information_for_user(data) do
1073 info = Map.merge(data.info, info)
1074 Map.put(data, :info, info)
1075 else
1076 {:enabled, false} ->
1077 data
1078
1079 e ->
1080 Logger.error(
1081 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1082 )
1083
1084 data
1085 end
1086 end
1087
1088 defp collection_private(data) do
1089 if is_map(data["first"]) and
1090 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1091 {:ok, false}
1092 else
1093 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1094 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1095 {:ok, false}
1096 else
1097 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1098 {:ok, true}
1099
1100 {:error, _} = e ->
1101 e
1102
1103 e ->
1104 {:error, e}
1105 end
1106 end
1107 end
1108
1109 def user_data_from_user_object(data) do
1110 with {:ok, data} <- MRF.filter(data),
1111 {:ok, data} <- object_to_user_data(data) do
1112 {:ok, data}
1113 else
1114 e -> {:error, e}
1115 end
1116 end
1117
1118 def fetch_and_prepare_user_from_ap_id(ap_id) do
1119 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1120 {:ok, data} <- user_data_from_user_object(data),
1121 data <- maybe_update_follow_information(data) do
1122 {:ok, data}
1123 else
1124 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1125 end
1126 end
1127
1128 def make_user_from_ap_id(ap_id) do
1129 if _user = User.get_cached_by_ap_id(ap_id) do
1130 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1131 else
1132 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1133 User.insert_or_update_user(data)
1134 else
1135 e -> {:error, e}
1136 end
1137 end
1138 end
1139
1140 def make_user_from_nickname(nickname) do
1141 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1142 make_user_from_ap_id(ap_id)
1143 else
1144 _e -> {:error, "No AP id in WebFinger"}
1145 end
1146 end
1147
1148 # filter out broken threads
1149 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1150 entire_thread_visible_for_user?(activity, user)
1151 end
1152
1153 # do post-processing on a specific activity
1154 def contain_activity(%Activity{} = activity, %User{} = user) do
1155 contain_broken_threads(activity, user)
1156 end
1157
1158 def fetch_direct_messages_query do
1159 Activity
1160 |> restrict_type(%{"type" => "Create"})
1161 |> restrict_visibility(%{visibility: "direct"})
1162 |> order_by([activity], asc: activity.id)
1163 end
1164 end