Merge branch 'preload-user-timelines' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26 require Pleroma.Constants
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = Map.get(data, "to", [])
32 cc = Map.get(data, "cc", [])
33 bcc = Map.get(data, "bcc", [])
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil -> true
40 user -> User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 actor = Map.get(data, "actor", [])
52 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = Map.get(data, "to", [])
58 cc = Map.get(data, "cc", [])
59 bcc = Map.get(data, "bcc", [])
60 recipients = Enum.concat([to, cc, bcc])
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" => %{"inReplyTo" => reply_ap_id} = object,
94 "type" => "Create"
95 }) do
96 if is_public?(object) do
97 Object.increase_replies_count(reply_ap_id)
98 end
99 end
100
101 def increase_replies_count_if_reply(_create_data), do: :noop
102
103 def decrease_replies_count_if_reply(%Object{
104 data: %{"inReplyTo" => reply_ap_id} = object
105 }) do
106 if is_public?(object) do
107 Object.decrease_replies_count(reply_ap_id)
108 end
109 end
110
111 def decrease_replies_count_if_reply(_object), do: :noop
112
113 def increase_poll_votes_if_vote(%{
114 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
115 "type" => "Create"
116 }) do
117 Object.increase_vote_count(reply_ap_id, name)
118 end
119
120 def increase_poll_votes_if_vote(_create_data), do: :noop
121
122 def insert(map, local \\ true, fake \\ false) when is_map(map) do
123 with nil <- Activity.normalize(map),
124 map <- lazy_put_activity_defaults(map, fake),
125 :ok <- check_actor_is_active(map["actor"]),
126 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
127 {:ok, map} <- MRF.filter(map),
128 {recipients, _, _} = get_recipients(map),
129 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
130 :ok <- Containment.contain_child(map),
131 {:ok, map, object} <- insert_full_object(map) do
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: map,
135 local: local,
136 actor: map["actor"],
137 recipients: recipients
138 })
139
140 # Splice in the child object if we have one.
141 activity =
142 if !is_nil(object) do
143 Map.put(activity, :object, object)
144 else
145 activity
146 end
147
148 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
149
150 Notification.create_notifications(activity)
151
152 participations =
153 activity
154 |> Conversation.create_or_bump_for()
155 |> get_participations()
156
157 stream_out(activity)
158 stream_out_participations(participations)
159 {:ok, activity}
160 else
161 %Activity{} = activity ->
162 {:ok, activity}
163
164 {:fake, true, map, recipients} ->
165 activity = %Activity{
166 data: map,
167 local: local,
168 actor: map["actor"],
169 recipients: recipients,
170 id: "pleroma:fakeid"
171 }
172
173 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
174 {:ok, activity}
175
176 error ->
177 {:error, error}
178 end
179 end
180
181 defp get_participations({:ok, %{participations: participations}}), do: participations
182 defp get_participations(_), do: []
183
184 def stream_out_participations(participations) do
185 participations =
186 participations
187 |> Repo.preload(:user)
188
189 Enum.each(participations, fn participation ->
190 Pleroma.Web.Streamer.stream("participation", participation)
191 end)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(activity) do
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if get_visibility(activity) == "public" do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 if get_visibility(activity) == "direct",
241 do: Pleroma.Web.Streamer.stream("direct", activity)
242 end
243 end
244 end
245 end
246
247 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
248 additional = params[:additional] || %{}
249 # only accept false as false value
250 local = !(params[:local] == false)
251 published = params[:published]
252
253 with create_data <-
254 make_create_data(
255 %{to: to, actor: actor, published: published, context: context, object: object},
256 additional
257 ),
258 {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 # Changing note count prior to enqueuing federation task in order to avoid
263 # race conditions on updating user.info
264 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:fake, true, activity} ->
269 {:ok, activity}
270
271 {:error, message} ->
272 {:error, message}
273 end
274 end
275
276 def accept(%{to: to, actor: actor, object: object} = params) do
277 # only accept false as false value
278 local = !(params[:local] == false)
279
280 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
281 {:ok, activity} <- insert(data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 end
285 end
286
287 def reject(%{to: to, actor: actor, object: object} = params) do
288 # only accept false as false value
289 local = !(params[:local] == false)
290
291 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 # only accept false as false value
300 local = !(params[:local] == false)
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 {:ok, activity} <- insert(data, local),
310 :ok <- maybe_federate(activity) do
311 {:ok, activity}
312 end
313 end
314
315 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
316 def like(
317 %User{ap_id: ap_id} = user,
318 %Object{data: %{"id" => _}} = object,
319 activity_id \\ nil,
320 local \\ true
321 ) do
322 with nil <- get_existing_like(ap_id, object),
323 like_data <- make_like_data(user, object, activity_id),
324 {:ok, activity} <- insert(like_data, local),
325 {:ok, object} <- add_like_to_object(activity, object),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity, object}
328 else
329 %Activity{} = activity -> {:ok, activity, object}
330 error -> {:error, error}
331 end
332 end
333
334 def unlike(
335 %User{} = actor,
336 %Object{} = object,
337 activity_id \\ nil,
338 local \\ true
339 ) do
340 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
341 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
342 {:ok, unlike_activity} <- insert(unlike_data, local),
343 {:ok, _activity} <- Repo.delete(like_activity),
344 {:ok, object} <- remove_like_from_object(like_activity, object),
345 :ok <- maybe_federate(unlike_activity) do
346 {:ok, unlike_activity, like_activity, object}
347 else
348 _e -> {:ok, object}
349 end
350 end
351
352 def announce(
353 %User{ap_id: _} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true,
357 public \\ true
358 ) do
359 with true <- is_public?(object),
360 announce_data <- make_announce_data(user, object, activity_id, public),
361 {:ok, activity} <- insert(announce_data, local),
362 {:ok, object} <- add_announce_to_object(activity, object),
363 :ok <- maybe_federate(activity) do
364 {:ok, activity, object}
365 else
366 error -> {:error, error}
367 end
368 end
369
370 def unannounce(
371 %User{} = actor,
372 %Object{} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
377 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
378 {:ok, unannounce_activity} <- insert(unannounce_data, local),
379 :ok <- maybe_federate(unannounce_activity),
380 {:ok, _activity} <- Repo.delete(announce_activity),
381 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
382 {:ok, unannounce_activity, object}
383 else
384 _e -> {:ok, object}
385 end
386 end
387
388 def follow(follower, followed, activity_id \\ nil, local \\ true) do
389 with data <- make_follow_data(follower, followed, activity_id),
390 {:ok, activity} <- insert(data, local),
391 :ok <- maybe_federate(activity) do
392 {:ok, activity}
393 end
394 end
395
396 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 :ok <- maybe_federate(activity) do
402 {:ok, activity}
403 end
404 end
405
406 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
407 with data <- %{
408 "to" => [follower_address],
409 "type" => "Delete",
410 "actor" => ap_id,
411 "object" => %{"type" => "Person", "id" => ap_id}
412 },
413 {:ok, activity} <- insert(data, true, true),
414 :ok <- maybe_federate(activity) do
415 {:ok, user}
416 end
417 end
418
419 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
420 user = User.get_cached_by_ap_id(actor)
421 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
422
423 with {:ok, object, activity} <- Object.delete(object),
424 data <- %{
425 "type" => "Delete",
426 "actor" => actor,
427 "object" => id,
428 "to" => to,
429 "deleted_activity_id" => activity && activity.id
430 },
431 {:ok, activity} <- insert(data, local, false),
432 stream_out_participations(object, user),
433 _ <- decrease_replies_count_if_reply(object),
434 # Changing note count prior to enqueuing federation task in order to avoid
435 # race conditions on updating user.info
436 {:ok, _actor} <- decrease_note_count_if_public(user, object),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
443 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
444 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
445
446 if unfollow_blocked do
447 follow_activity = fetch_latest_follow(blocker, blocked)
448 if follow_activity, do: unfollow(blocker, blocked, nil, local)
449 end
450
451 with true <- outgoing_blocks,
452 block_data <- make_block_data(blocker, blocked, activity_id),
453 {:ok, activity} <- insert(block_data, local),
454 :ok <- maybe_federate(activity) do
455 {:ok, activity}
456 else
457 _e -> {:ok, nil}
458 end
459 end
460
461 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
462 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
463 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
464 {:ok, activity} <- insert(unblock_data, local),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 end
468 end
469
470 def flag(
471 %{
472 actor: actor,
473 context: context,
474 account: account,
475 statuses: statuses,
476 content: content
477 } = params
478 ) do
479 # only accept false as false value
480 local = !(params[:local] == false)
481 forward = !(params[:forward] == false)
482
483 additional = params[:additional] || %{}
484
485 params = %{
486 actor: actor,
487 context: context,
488 account: account,
489 statuses: statuses,
490 content: content
491 }
492
493 additional =
494 if forward do
495 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
496 else
497 Map.merge(additional, %{"to" => [], "cc" => []})
498 end
499
500 with flag_data <- make_flag_data(params, additional),
501 {:ok, activity} <- insert(flag_data, local),
502 :ok <- maybe_federate(activity) do
503 Enum.each(User.all_superusers(), fn superuser ->
504 superuser
505 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
506 |> Pleroma.Emails.Mailer.deliver_async()
507 end)
508
509 {:ok, activity}
510 end
511 end
512
513 defp fetch_activities_for_context_query(context, opts) do
514 public = [Pleroma.Constants.as_public()]
515
516 recipients =
517 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
518
519 from(activity in Activity)
520 |> maybe_preload_objects(opts)
521 |> maybe_preload_bookmarks(opts)
522 |> maybe_set_thread_muted_field(opts)
523 |> restrict_blocked(opts)
524 |> restrict_recipients(recipients, opts["user"])
525 |> where(
526 [activity],
527 fragment(
528 "?->>'type' = ? and ?->>'context' = ?",
529 activity.data,
530 "Create",
531 activity.data,
532 ^context
533 )
534 )
535 |> exclude_poll_votes(opts)
536 |> exclude_id(opts)
537 |> order_by([activity], desc: activity.id)
538 end
539
540 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
541 def fetch_activities_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(opts)
544 |> Repo.all()
545 end
546
547 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
548 Pleroma.FlakeId.t() | nil
549 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
550 context
551 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
552 |> limit(1)
553 |> select([a], a.id)
554 |> Repo.one()
555 end
556
557 def fetch_public_activities(opts \\ %{}) do
558 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
559
560 q
561 |> restrict_unlisted()
562 |> Pagination.fetch_paginated(opts)
563 |> Enum.reverse()
564 end
565
566 @valid_visibilities ~w[direct unlisted public private]
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
571 query =
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583
584 query
585 else
586 Logger.error("Could not restrict visibility to #{visibility}")
587 end
588 end
589
590 defp restrict_visibility(query, %{visibility: visibility})
591 when visibility in @valid_visibilities do
592 from(
593 a in query,
594 where:
595 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
596 )
597 end
598
599 defp restrict_visibility(_query, %{visibility: visibility})
600 when visibility not in @valid_visibilities do
601 Logger.error("Could not restrict visibility to #{visibility}")
602 end
603
604 defp restrict_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(
610 query,
611 %{"user" => %User{info: %{skip_thread_containment: true}}},
612 _
613 ),
614 do: query
615
616 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
617 from(
618 a in query,
619 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
620 )
621 end
622
623 defp restrict_thread_visibility(query, _, _), do: query
624
625 def fetch_user_activities(user, reading_user, params \\ %{}) do
626 params =
627 params
628 |> Map.put("type", ["Create", "Announce"])
629 |> Map.put("user", reading_user)
630 |> Map.put("actor_id", user.ap_id)
631 |> Map.put("whole_db", true)
632 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
633
634 recipients =
635 user_activities_recipients(%{
636 "godmode" => params["godmode"],
637 "reading_user" => reading_user
638 })
639
640 fetch_activities(recipients, params)
641 |> Enum.reverse()
642 end
643
644 defp user_activities_recipients(%{"godmode" => true}) do
645 []
646 end
647
648 defp user_activities_recipients(%{"reading_user" => reading_user}) do
649 if reading_user do
650 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
651 else
652 [Pleroma.Constants.as_public()]
653 end
654 end
655
656 defp restrict_since(query, %{"since_id" => ""}), do: query
657
658 defp restrict_since(query, %{"since_id" => since_id}) do
659 from(activity in query, where: activity.id > ^since_id)
660 end
661
662 defp restrict_since(query, _), do: query
663
664 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
665 raise "Can't use the child object without preloading!"
666 end
667
668 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
669 when is_list(tag_reject) and tag_reject != [] do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, _), do: query
677
678 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
679 raise "Can't use the child object without preloading!"
680 end
681
682 defp restrict_tag_all(query, %{"tag_all" => tag_all})
683 when is_list(tag_all) and tag_all != [] do
684 from(
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
687 )
688 end
689
690 defp restrict_tag_all(query, _), do: query
691
692 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
693 raise "Can't use the child object without preloading!"
694 end
695
696 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
697 from(
698 [_activity, object] in query,
699 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
700 )
701 end
702
703 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
707 )
708 end
709
710 defp restrict_tag(query, _), do: query
711
712 defp restrict_recipients(query, [], _user), do: query
713
714 defp restrict_recipients(query, recipients, nil) do
715 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
716 end
717
718 defp restrict_recipients(query, recipients, user) do
719 from(
720 activity in query,
721 where: fragment("? && ?", ^recipients, activity.recipients),
722 or_where: activity.actor == ^user.ap_id
723 )
724 end
725
726 defp restrict_local(query, %{"local_only" => true}) do
727 from(activity in query, where: activity.local == true)
728 end
729
730 defp restrict_local(query, _), do: query
731
732 defp restrict_actor(query, %{"actor_id" => actor_id}) do
733 from(activity in query, where: activity.actor == ^actor_id)
734 end
735
736 defp restrict_actor(query, _), do: query
737
738 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
739 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
740 end
741
742 defp restrict_type(query, %{"type" => type}) do
743 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
744 end
745
746 defp restrict_type(query, _), do: query
747
748 defp restrict_state(query, %{"state" => state}) do
749 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
750 end
751
752 defp restrict_state(query, _), do: query
753
754 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
755 from(
756 [_activity, object] in query,
757 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
758 )
759 end
760
761 defp restrict_favorited_by(query, _), do: query
762
763 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
764 raise "Can't use the child object without preloading!"
765 end
766
767 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
768 from(
769 [_activity, object] in query,
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 )
772 end
773
774 defp restrict_media(query, _), do: query
775
776 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
777 from(
778 activity in query,
779 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
780 )
781 end
782
783 defp restrict_replies(query, _), do: query
784
785 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
786 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
787 end
788
789 defp restrict_reblogs(query, _), do: query
790
791 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
792
793 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
794 mutes = info.mutes
795
796 from(
797 activity in query,
798 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
799 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
800 )
801 end
802
803 defp restrict_muted(query, _), do: query
804
805 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
806 blocks = info.blocks || []
807 domain_blocks = info.domain_blocks || []
808
809 query =
810 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
811
812 from(
813 [activity, object: o] in query,
814 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
815 where: fragment("not (? && ?)", activity.recipients, ^blocks),
816 where:
817 fragment(
818 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
819 activity.data,
820 activity.data,
821 ^blocks
822 ),
823 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
824 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
825 )
826 end
827
828 defp restrict_blocked(query, _), do: query
829
830 defp restrict_unlisted(query) do
831 from(
832 activity in query,
833 where:
834 fragment(
835 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
836 activity.data,
837 ^[Pleroma.Constants.as_public()]
838 )
839 )
840 end
841
842 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
843 from(activity in query, where: activity.id in ^ids)
844 end
845
846 defp restrict_pinned(query, _), do: query
847
848 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
849 muted_reblogs = info.muted_reblogs || []
850
851 from(
852 activity in query,
853 where:
854 fragment(
855 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
856 activity.data,
857 activity.actor,
858 ^muted_reblogs
859 )
860 )
861 end
862
863 defp restrict_muted_reblogs(query, _), do: query
864
865 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
866
867 defp exclude_poll_votes(query, _) do
868 if has_named_binding?(query, :object) do
869 from([activity, object: o] in query,
870 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
871 )
872 else
873 query
874 end
875 end
876
877 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
878 from(activity in query, where: activity.id != ^id)
879 end
880
881 defp exclude_id(query, _), do: query
882
883 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
884
885 defp maybe_preload_objects(query, _) do
886 query
887 |> Activity.with_preloaded_object()
888 end
889
890 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
891
892 defp maybe_preload_bookmarks(query, opts) do
893 query
894 |> Activity.with_preloaded_bookmark(opts["user"])
895 end
896
897 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
898
899 defp maybe_set_thread_muted_field(query, opts) do
900 query
901 |> Activity.with_set_thread_muted_field(opts["user"])
902 end
903
904 defp maybe_order(query, %{order: :desc}) do
905 query
906 |> order_by(desc: :id)
907 end
908
909 defp maybe_order(query, %{order: :asc}) do
910 query
911 |> order_by(asc: :id)
912 end
913
914 defp maybe_order(query, _), do: query
915
916 def fetch_activities_query(recipients, opts \\ %{}) do
917 config = %{
918 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
919 }
920
921 Activity
922 |> maybe_preload_objects(opts)
923 |> maybe_preload_bookmarks(opts)
924 |> maybe_set_thread_muted_field(opts)
925 |> maybe_order(opts)
926 |> restrict_recipients(recipients, opts["user"])
927 |> restrict_tag(opts)
928 |> restrict_tag_reject(opts)
929 |> restrict_tag_all(opts)
930 |> restrict_since(opts)
931 |> restrict_local(opts)
932 |> restrict_actor(opts)
933 |> restrict_type(opts)
934 |> restrict_state(opts)
935 |> restrict_favorited_by(opts)
936 |> restrict_blocked(opts)
937 |> restrict_muted(opts)
938 |> restrict_media(opts)
939 |> restrict_visibility(opts)
940 |> restrict_thread_visibility(opts, config)
941 |> restrict_replies(opts)
942 |> restrict_reblogs(opts)
943 |> restrict_pinned(opts)
944 |> restrict_muted_reblogs(opts)
945 |> Activity.restrict_deactivated_users()
946 |> exclude_poll_votes(opts)
947 end
948
949 def fetch_activities(recipients, opts \\ %{}) do
950 list_memberships = Pleroma.List.memberships(opts["user"])
951
952 fetch_activities_query(recipients ++ list_memberships, opts)
953 |> Pagination.fetch_paginated(opts)
954 |> Enum.reverse()
955 |> maybe_update_cc(list_memberships, opts["user"])
956 end
957
958 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
959 when is_list(list_memberships) and length(list_memberships) > 0 do
960 Enum.map(activities, fn
961 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
962 if Enum.any?(bcc, &(&1 in list_memberships)) do
963 update_in(activity.data["cc"], &[user_ap_id | &1])
964 else
965 activity
966 end
967
968 activity ->
969 activity
970 end)
971 end
972
973 defp maybe_update_cc(activities, _, _), do: activities
974
975 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
976 from(activity in query,
977 where:
978 fragment("? && ?", activity.recipients, ^recipients) or
979 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
980 ^Pleroma.Constants.as_public() in activity.recipients)
981 )
982 end
983
984 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
985 fetch_activities_query([], opts)
986 |> fetch_activities_bounded_query(recipients, recipients_with_public)
987 |> Pagination.fetch_paginated(opts)
988 |> Enum.reverse()
989 end
990
991 def upload(file, opts \\ []) do
992 with {:ok, data} <- Upload.store(file, opts) do
993 obj_data =
994 if opts[:actor] do
995 Map.put(data, "actor", opts[:actor])
996 else
997 data
998 end
999
1000 Repo.insert(%Object{data: obj_data})
1001 end
1002 end
1003
1004 defp object_to_user_data(data) do
1005 avatar =
1006 data["icon"]["url"] &&
1007 %{
1008 "type" => "Image",
1009 "url" => [%{"href" => data["icon"]["url"]}]
1010 }
1011
1012 banner =
1013 data["image"]["url"] &&
1014 %{
1015 "type" => "Image",
1016 "url" => [%{"href" => data["image"]["url"]}]
1017 }
1018
1019 locked = data["manuallyApprovesFollowers"] || false
1020 data = Transmogrifier.maybe_fix_user_object(data)
1021
1022 user_data = %{
1023 ap_id: data["id"],
1024 info: %{
1025 ap_enabled: true,
1026 source_data: data,
1027 banner: banner,
1028 locked: locked
1029 },
1030 avatar: avatar,
1031 name: data["name"],
1032 follower_address: data["followers"],
1033 following_address: data["following"],
1034 bio: data["summary"]
1035 }
1036
1037 # nickname can be nil because of virtual actors
1038 user_data =
1039 if data["preferredUsername"] do
1040 Map.put(
1041 user_data,
1042 :nickname,
1043 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1044 )
1045 else
1046 Map.put(user_data, :nickname, nil)
1047 end
1048
1049 {:ok, user_data}
1050 end
1051
1052 def fetch_follow_information_for_user(user) do
1053 with {:ok, following_data} <-
1054 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1055 following_count when is_integer(following_count) <- following_data["totalItems"],
1056 {:ok, hide_follows} <- collection_private(following_data),
1057 {:ok, followers_data} <-
1058 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1059 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1060 {:ok, hide_followers} <- collection_private(followers_data) do
1061 {:ok,
1062 %{
1063 hide_follows: hide_follows,
1064 follower_count: followers_count,
1065 following_count: following_count,
1066 hide_followers: hide_followers
1067 }}
1068 else
1069 {:error, _} = e ->
1070 e
1071
1072 e ->
1073 {:error, e}
1074 end
1075 end
1076
1077 defp maybe_update_follow_information(data) do
1078 with {:enabled, true} <-
1079 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1080 {:ok, info} <- fetch_follow_information_for_user(data) do
1081 info = Map.merge(data.info, info)
1082 Map.put(data, :info, info)
1083 else
1084 {:enabled, false} ->
1085 data
1086
1087 e ->
1088 Logger.error(
1089 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1090 )
1091
1092 data
1093 end
1094 end
1095
1096 defp collection_private(data) do
1097 if is_map(data["first"]) and
1098 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1099 {:ok, false}
1100 else
1101 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1102 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1103 {:ok, false}
1104 else
1105 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1106 {:ok, true}
1107
1108 {:error, _} = e ->
1109 e
1110
1111 e ->
1112 {:error, e}
1113 end
1114 end
1115 end
1116
1117 def user_data_from_user_object(data) do
1118 with {:ok, data} <- MRF.filter(data),
1119 {:ok, data} <- object_to_user_data(data) do
1120 {:ok, data}
1121 else
1122 e -> {:error, e}
1123 end
1124 end
1125
1126 def fetch_and_prepare_user_from_ap_id(ap_id) do
1127 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1128 {:ok, data} <- user_data_from_user_object(data),
1129 data <- maybe_update_follow_information(data) do
1130 {:ok, data}
1131 else
1132 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1133 end
1134 end
1135
1136 def make_user_from_ap_id(ap_id) do
1137 if _user = User.get_cached_by_ap_id(ap_id) do
1138 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1139 else
1140 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1141 User.insert_or_update_user(data)
1142 else
1143 e -> {:error, e}
1144 end
1145 end
1146 end
1147
1148 def make_user_from_nickname(nickname) do
1149 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1150 make_user_from_ap_id(ap_id)
1151 else
1152 _e -> {:error, "No AP id in WebFinger"}
1153 end
1154 end
1155
1156 # filter out broken threads
1157 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1158 entire_thread_visible_for_user?(activity, user)
1159 end
1160
1161 # do post-processing on a specific activity
1162 def contain_activity(%Activity{} = activity, %User{} = user) do
1163 contain_broken_threads(activity, user)
1164 end
1165
1166 def fetch_direct_messages_query do
1167 Activity
1168 |> restrict_type(%{"type" => "Create"})
1169 |> restrict_visibility(%{visibility: "direct"})
1170 |> order_by([activity], asc: activity.id)
1171 end
1172 end