Merge branch 'get-context-optimizations' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26 require Pleroma.Constants
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = Map.get(data, "to", [])
32 cc = Map.get(data, "cc", [])
33 bcc = Map.get(data, "bcc", [])
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil -> true
40 user -> User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 actor = Map.get(data, "actor", [])
52 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = Map.get(data, "to", [])
58 cc = Map.get(data, "cc", [])
59 bcc = Map.get(data, "bcc", [])
60 recipients = Enum.concat([to, cc, bcc])
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" => %{"inReplyTo" => reply_ap_id} = object,
94 "type" => "Create"
95 }) do
96 if is_public?(object) do
97 Object.increase_replies_count(reply_ap_id)
98 end
99 end
100
101 def increase_replies_count_if_reply(_create_data), do: :noop
102
103 def decrease_replies_count_if_reply(%Object{
104 data: %{"inReplyTo" => reply_ap_id} = object
105 }) do
106 if is_public?(object) do
107 Object.decrease_replies_count(reply_ap_id)
108 end
109 end
110
111 def decrease_replies_count_if_reply(_object), do: :noop
112
113 def increase_poll_votes_if_vote(%{
114 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
115 "type" => "Create"
116 }) do
117 Object.increase_vote_count(reply_ap_id, name)
118 end
119
120 def increase_poll_votes_if_vote(_create_data), do: :noop
121
122 def insert(map, local \\ true, fake \\ false) when is_map(map) do
123 with nil <- Activity.normalize(map),
124 map <- lazy_put_activity_defaults(map, fake),
125 :ok <- check_actor_is_active(map["actor"]),
126 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
127 {:ok, map} <- MRF.filter(map),
128 {recipients, _, _} = get_recipients(map),
129 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
130 :ok <- Containment.contain_child(map),
131 {:ok, map, object} <- insert_full_object(map) do
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: map,
135 local: local,
136 actor: map["actor"],
137 recipients: recipients
138 })
139
140 # Splice in the child object if we have one.
141 activity =
142 if !is_nil(object) do
143 Map.put(activity, :object, object)
144 else
145 activity
146 end
147
148 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
149
150 Notification.create_notifications(activity)
151
152 participations =
153 activity
154 |> Conversation.create_or_bump_for()
155 |> get_participations()
156
157 stream_out(activity)
158 stream_out_participations(participations)
159 {:ok, activity}
160 else
161 %Activity{} = activity ->
162 {:ok, activity}
163
164 {:fake, true, map, recipients} ->
165 activity = %Activity{
166 data: map,
167 local: local,
168 actor: map["actor"],
169 recipients: recipients,
170 id: "pleroma:fakeid"
171 }
172
173 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
174 {:ok, activity}
175
176 error ->
177 {:error, error}
178 end
179 end
180
181 defp get_participations({:ok, %{participations: participations}}), do: participations
182 defp get_participations(_), do: []
183
184 def stream_out_participations(participations) do
185 participations =
186 participations
187 |> Repo.preload(:user)
188
189 Enum.each(participations, fn participation ->
190 Pleroma.Web.Streamer.stream("participation", participation)
191 end)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(activity) do
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if get_visibility(activity) == "public" do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 if get_visibility(activity) == "direct",
241 do: Pleroma.Web.Streamer.stream("direct", activity)
242 end
243 end
244 end
245 end
246
247 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
248 additional = params[:additional] || %{}
249 # only accept false as false value
250 local = !(params[:local] == false)
251 published = params[:published]
252
253 with create_data <-
254 make_create_data(
255 %{to: to, actor: actor, published: published, context: context, object: object},
256 additional
257 ),
258 {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 # Changing note count prior to enqueuing federation task in order to avoid
263 # race conditions on updating user.info
264 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:fake, true, activity} ->
269 {:ok, activity}
270
271 {:error, message} ->
272 {:error, message}
273 end
274 end
275
276 def accept(%{to: to, actor: actor, object: object} = params) do
277 # only accept false as false value
278 local = !(params[:local] == false)
279
280 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
281 {:ok, activity} <- insert(data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 end
285 end
286
287 def reject(%{to: to, actor: actor, object: object} = params) do
288 # only accept false as false value
289 local = !(params[:local] == false)
290
291 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 # only accept false as false value
300 local = !(params[:local] == false)
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 {:ok, activity} <- insert(data, local),
310 :ok <- maybe_federate(activity) do
311 {:ok, activity}
312 end
313 end
314
315 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
316 def like(
317 %User{ap_id: ap_id} = user,
318 %Object{data: %{"id" => _}} = object,
319 activity_id \\ nil,
320 local \\ true
321 ) do
322 with nil <- get_existing_like(ap_id, object),
323 like_data <- make_like_data(user, object, activity_id),
324 {:ok, activity} <- insert(like_data, local),
325 {:ok, object} <- add_like_to_object(activity, object),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity, object}
328 else
329 %Activity{} = activity -> {:ok, activity, object}
330 error -> {:error, error}
331 end
332 end
333
334 def unlike(
335 %User{} = actor,
336 %Object{} = object,
337 activity_id \\ nil,
338 local \\ true
339 ) do
340 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
341 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
342 {:ok, unlike_activity} <- insert(unlike_data, local),
343 {:ok, _activity} <- Repo.delete(like_activity),
344 {:ok, object} <- remove_like_from_object(like_activity, object),
345 :ok <- maybe_federate(unlike_activity) do
346 {:ok, unlike_activity, like_activity, object}
347 else
348 _e -> {:ok, object}
349 end
350 end
351
352 def announce(
353 %User{ap_id: _} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true,
357 public \\ true
358 ) do
359 with true <- is_public?(object),
360 announce_data <- make_announce_data(user, object, activity_id, public),
361 {:ok, activity} <- insert(announce_data, local),
362 {:ok, object} <- add_announce_to_object(activity, object),
363 :ok <- maybe_federate(activity) do
364 {:ok, activity, object}
365 else
366 error -> {:error, error}
367 end
368 end
369
370 def unannounce(
371 %User{} = actor,
372 %Object{} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
377 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
378 {:ok, unannounce_activity} <- insert(unannounce_data, local),
379 :ok <- maybe_federate(unannounce_activity),
380 {:ok, _activity} <- Repo.delete(announce_activity),
381 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
382 {:ok, unannounce_activity, object}
383 else
384 _e -> {:ok, object}
385 end
386 end
387
388 def follow(follower, followed, activity_id \\ nil, local \\ true) do
389 with data <- make_follow_data(follower, followed, activity_id),
390 {:ok, activity} <- insert(data, local),
391 :ok <- maybe_federate(activity) do
392 {:ok, activity}
393 end
394 end
395
396 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 :ok <- maybe_federate(activity) do
402 {:ok, activity}
403 end
404 end
405
406 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
407 with data <- %{
408 "to" => [follower_address],
409 "type" => "Delete",
410 "actor" => ap_id,
411 "object" => %{"type" => "Person", "id" => ap_id}
412 },
413 {:ok, activity} <- insert(data, true, true),
414 :ok <- maybe_federate(activity) do
415 {:ok, user}
416 end
417 end
418
419 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
420 user = User.get_cached_by_ap_id(actor)
421 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
422
423 with {:ok, object, activity} <- Object.delete(object),
424 data <- %{
425 "type" => "Delete",
426 "actor" => actor,
427 "object" => id,
428 "to" => to,
429 "deleted_activity_id" => activity && activity.id
430 },
431 {:ok, activity} <- insert(data, local, false),
432 stream_out_participations(object, user),
433 _ <- decrease_replies_count_if_reply(object),
434 # Changing note count prior to enqueuing federation task in order to avoid
435 # race conditions on updating user.info
436 {:ok, _actor} <- decrease_note_count_if_public(user, object),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
443 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
444 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
445
446 if unfollow_blocked do
447 follow_activity = fetch_latest_follow(blocker, blocked)
448 if follow_activity, do: unfollow(blocker, blocked, nil, local)
449 end
450
451 with true <- outgoing_blocks,
452 block_data <- make_block_data(blocker, blocked, activity_id),
453 {:ok, activity} <- insert(block_data, local),
454 :ok <- maybe_federate(activity) do
455 {:ok, activity}
456 else
457 _e -> {:ok, nil}
458 end
459 end
460
461 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
462 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
463 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
464 {:ok, activity} <- insert(unblock_data, local),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 end
468 end
469
470 def flag(
471 %{
472 actor: actor,
473 context: context,
474 account: account,
475 statuses: statuses,
476 content: content
477 } = params
478 ) do
479 # only accept false as false value
480 local = !(params[:local] == false)
481 forward = !(params[:forward] == false)
482
483 additional = params[:additional] || %{}
484
485 params = %{
486 actor: actor,
487 context: context,
488 account: account,
489 statuses: statuses,
490 content: content
491 }
492
493 additional =
494 if forward do
495 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
496 else
497 Map.merge(additional, %{"to" => [], "cc" => []})
498 end
499
500 with flag_data <- make_flag_data(params, additional),
501 {:ok, activity} <- insert(flag_data, local),
502 :ok <- maybe_federate(activity) do
503 Enum.each(User.all_superusers(), fn superuser ->
504 superuser
505 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
506 |> Pleroma.Emails.Mailer.deliver_async()
507 end)
508
509 {:ok, activity}
510 end
511 end
512
513 defp fetch_activities_for_context_query(context, opts) do
514 public = [Pleroma.Constants.as_public()]
515
516 recipients =
517 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
518
519 from(activity in Activity)
520 |> maybe_preload_objects(opts)
521 |> maybe_preload_bookmarks(opts)
522 |> maybe_set_thread_muted_field(opts)
523 |> restrict_blocked(opts)
524 |> restrict_recipients(recipients, opts["user"])
525 |> where(
526 [activity],
527 fragment(
528 "?->>'type' = ? and ?->>'context' = ?",
529 activity.data,
530 "Create",
531 activity.data,
532 ^context
533 )
534 )
535 |> exclude_poll_votes(opts)
536 |> exclude_id(opts)
537 |> order_by([activity], desc: activity.id)
538 end
539
540 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
541 def fetch_activities_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(opts)
544 |> Repo.all()
545 end
546
547 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
548 Pleroma.FlakeId.t() | nil
549 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
550 context
551 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
552 |> limit(1)
553 |> select([a], a.id)
554 |> Repo.one()
555 end
556
557 def fetch_public_activities(opts \\ %{}) do
558 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
559
560 q
561 |> restrict_unlisted()
562 |> Pagination.fetch_paginated(opts)
563 |> Enum.reverse()
564 end
565
566 @valid_visibilities ~w[direct unlisted public private]
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
571 query =
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583
584 query
585 else
586 Logger.error("Could not restrict visibility to #{visibility}")
587 end
588 end
589
590 defp restrict_visibility(query, %{visibility: visibility})
591 when visibility in @valid_visibilities do
592 from(
593 a in query,
594 where:
595 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
596 )
597 end
598
599 defp restrict_visibility(_query, %{visibility: visibility})
600 when visibility not in @valid_visibilities do
601 Logger.error("Could not restrict visibility to #{visibility}")
602 end
603
604 defp restrict_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(
610 query,
611 %{"user" => %User{info: %{skip_thread_containment: true}}},
612 _
613 ),
614 do: query
615
616 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
617 from(
618 a in query,
619 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
620 )
621 end
622
623 defp restrict_thread_visibility(query, _, _), do: query
624
625 def fetch_user_activities(user, reading_user, params \\ %{}) do
626 params =
627 params
628 |> Map.put("type", ["Create", "Announce"])
629 |> Map.put("actor_id", user.ap_id)
630 |> Map.put("whole_db", true)
631 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
632
633 recipients =
634 user_activities_recipients(%{
635 "godmode" => params["godmode"],
636 "reading_user" => reading_user
637 })
638
639 fetch_activities(recipients, params)
640 |> Enum.reverse()
641 end
642
643 defp user_activities_recipients(%{"godmode" => true}) do
644 []
645 end
646
647 defp user_activities_recipients(%{"reading_user" => reading_user}) do
648 if reading_user do
649 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
650 else
651 [Pleroma.Constants.as_public()]
652 end
653 end
654
655 defp restrict_since(query, %{"since_id" => ""}), do: query
656
657 defp restrict_since(query, %{"since_id" => since_id}) do
658 from(activity in query, where: activity.id > ^since_id)
659 end
660
661 defp restrict_since(query, _), do: query
662
663 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
664 raise "Can't use the child object without preloading!"
665 end
666
667 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
668 when is_list(tag_reject) and tag_reject != [] do
669 from(
670 [_activity, object] in query,
671 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
672 )
673 end
674
675 defp restrict_tag_reject(query, _), do: query
676
677 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
678 raise "Can't use the child object without preloading!"
679 end
680
681 defp restrict_tag_all(query, %{"tag_all" => tag_all})
682 when is_list(tag_all) and tag_all != [] do
683 from(
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
686 )
687 end
688
689 defp restrict_tag_all(query, _), do: query
690
691 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
692 raise "Can't use the child object without preloading!"
693 end
694
695 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
696 from(
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
699 )
700 end
701
702 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, _), do: query
710
711 defp restrict_recipients(query, [], _user), do: query
712
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
715 end
716
717 defp restrict_recipients(query, recipients, user) do
718 from(
719 activity in query,
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
722 )
723 end
724
725 defp restrict_local(query, %{"local_only" => true}) do
726 from(activity in query, where: activity.local == true)
727 end
728
729 defp restrict_local(query, _), do: query
730
731 defp restrict_actor(query, %{"actor_id" => actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
733 end
734
735 defp restrict_actor(query, _), do: query
736
737 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
739 end
740
741 defp restrict_type(query, %{"type" => type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
743 end
744
745 defp restrict_type(query, _), do: query
746
747 defp restrict_state(query, %{"state" => state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
749 end
750
751 defp restrict_state(query, _), do: query
752
753 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
757 )
758 end
759
760 defp restrict_favorited_by(query, _), do: query
761
762 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
763 raise "Can't use the child object without preloading!"
764 end
765
766 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
767 from(
768 [_activity, object] in query,
769 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
770 )
771 end
772
773 defp restrict_media(query, _), do: query
774
775 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
776 from(
777 activity in query,
778 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
779 )
780 end
781
782 defp restrict_replies(query, _), do: query
783
784 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
785 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
786 end
787
788 defp restrict_reblogs(query, _), do: query
789
790 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
791
792 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
793 mutes = info.mutes
794
795 from(
796 activity in query,
797 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
798 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
799 )
800 end
801
802 defp restrict_muted(query, _), do: query
803
804 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
805 blocks = info.blocks || []
806 domain_blocks = info.domain_blocks || []
807
808 query =
809 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
810
811 from(
812 [activity, object: o] in query,
813 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
814 where: fragment("not (? && ?)", activity.recipients, ^blocks),
815 where:
816 fragment(
817 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
818 activity.data,
819 activity.data,
820 ^blocks
821 ),
822 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
823 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
824 )
825 end
826
827 defp restrict_blocked(query, _), do: query
828
829 defp restrict_unlisted(query) do
830 from(
831 activity in query,
832 where:
833 fragment(
834 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
835 activity.data,
836 ^[Pleroma.Constants.as_public()]
837 )
838 )
839 end
840
841 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
842 from(activity in query, where: activity.id in ^ids)
843 end
844
845 defp restrict_pinned(query, _), do: query
846
847 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
848 muted_reblogs = info.muted_reblogs || []
849
850 from(
851 activity in query,
852 where:
853 fragment(
854 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
855 activity.data,
856 activity.actor,
857 ^muted_reblogs
858 )
859 )
860 end
861
862 defp restrict_muted_reblogs(query, _), do: query
863
864 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
865
866 defp exclude_poll_votes(query, _) do
867 if has_named_binding?(query, :object) do
868 from([activity, object: o] in query,
869 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
870 )
871 else
872 query
873 end
874 end
875
876 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
877 from(activity in query, where: activity.id != ^id)
878 end
879
880 defp exclude_id(query, _), do: query
881
882 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
883
884 defp maybe_preload_objects(query, _) do
885 query
886 |> Activity.with_preloaded_object()
887 end
888
889 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
890
891 defp maybe_preload_bookmarks(query, opts) do
892 query
893 |> Activity.with_preloaded_bookmark(opts["user"])
894 end
895
896 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
897
898 defp maybe_set_thread_muted_field(query, opts) do
899 query
900 |> Activity.with_set_thread_muted_field(opts["user"])
901 end
902
903 defp maybe_order(query, %{order: :desc}) do
904 query
905 |> order_by(desc: :id)
906 end
907
908 defp maybe_order(query, %{order: :asc}) do
909 query
910 |> order_by(asc: :id)
911 end
912
913 defp maybe_order(query, _), do: query
914
915 def fetch_activities_query(recipients, opts \\ %{}) do
916 config = %{
917 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
918 }
919
920 Activity
921 |> maybe_preload_objects(opts)
922 |> maybe_preload_bookmarks(opts)
923 |> maybe_set_thread_muted_field(opts)
924 |> maybe_order(opts)
925 |> restrict_recipients(recipients, opts["user"])
926 |> restrict_tag(opts)
927 |> restrict_tag_reject(opts)
928 |> restrict_tag_all(opts)
929 |> restrict_since(opts)
930 |> restrict_local(opts)
931 |> restrict_actor(opts)
932 |> restrict_type(opts)
933 |> restrict_state(opts)
934 |> restrict_favorited_by(opts)
935 |> restrict_blocked(opts)
936 |> restrict_muted(opts)
937 |> restrict_media(opts)
938 |> restrict_visibility(opts)
939 |> restrict_thread_visibility(opts, config)
940 |> restrict_replies(opts)
941 |> restrict_reblogs(opts)
942 |> restrict_pinned(opts)
943 |> restrict_muted_reblogs(opts)
944 |> Activity.restrict_deactivated_users()
945 |> exclude_poll_votes(opts)
946 end
947
948 def fetch_activities(recipients, opts \\ %{}) do
949 list_memberships = Pleroma.List.memberships(opts["user"])
950
951 fetch_activities_query(recipients ++ list_memberships, opts)
952 |> Pagination.fetch_paginated(opts)
953 |> Enum.reverse()
954 |> maybe_update_cc(list_memberships, opts["user"])
955 end
956
957 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
958 when is_list(list_memberships) and length(list_memberships) > 0 do
959 Enum.map(activities, fn
960 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
961 if Enum.any?(bcc, &(&1 in list_memberships)) do
962 update_in(activity.data["cc"], &[user_ap_id | &1])
963 else
964 activity
965 end
966
967 activity ->
968 activity
969 end)
970 end
971
972 defp maybe_update_cc(activities, _, _), do: activities
973
974 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
975 from(activity in query,
976 where:
977 fragment("? && ?", activity.recipients, ^recipients) or
978 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
979 ^Pleroma.Constants.as_public() in activity.recipients)
980 )
981 end
982
983 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
984 fetch_activities_query([], opts)
985 |> fetch_activities_bounded_query(recipients, recipients_with_public)
986 |> Pagination.fetch_paginated(opts)
987 |> Enum.reverse()
988 end
989
990 def upload(file, opts \\ []) do
991 with {:ok, data} <- Upload.store(file, opts) do
992 obj_data =
993 if opts[:actor] do
994 Map.put(data, "actor", opts[:actor])
995 else
996 data
997 end
998
999 Repo.insert(%Object{data: obj_data})
1000 end
1001 end
1002
1003 defp object_to_user_data(data) do
1004 avatar =
1005 data["icon"]["url"] &&
1006 %{
1007 "type" => "Image",
1008 "url" => [%{"href" => data["icon"]["url"]}]
1009 }
1010
1011 banner =
1012 data["image"]["url"] &&
1013 %{
1014 "type" => "Image",
1015 "url" => [%{"href" => data["image"]["url"]}]
1016 }
1017
1018 locked = data["manuallyApprovesFollowers"] || false
1019 data = Transmogrifier.maybe_fix_user_object(data)
1020
1021 user_data = %{
1022 ap_id: data["id"],
1023 info: %{
1024 ap_enabled: true,
1025 source_data: data,
1026 banner: banner,
1027 locked: locked
1028 },
1029 avatar: avatar,
1030 name: data["name"],
1031 follower_address: data["followers"],
1032 following_address: data["following"],
1033 bio: data["summary"]
1034 }
1035
1036 # nickname can be nil because of virtual actors
1037 user_data =
1038 if data["preferredUsername"] do
1039 Map.put(
1040 user_data,
1041 :nickname,
1042 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1043 )
1044 else
1045 Map.put(user_data, :nickname, nil)
1046 end
1047
1048 {:ok, user_data}
1049 end
1050
1051 def fetch_follow_information_for_user(user) do
1052 with {:ok, following_data} <-
1053 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1054 following_count when is_integer(following_count) <- following_data["totalItems"],
1055 {:ok, hide_follows} <- collection_private(following_data),
1056 {:ok, followers_data} <-
1057 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1058 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1059 {:ok, hide_followers} <- collection_private(followers_data) do
1060 {:ok,
1061 %{
1062 hide_follows: hide_follows,
1063 follower_count: followers_count,
1064 following_count: following_count,
1065 hide_followers: hide_followers
1066 }}
1067 else
1068 {:error, _} = e ->
1069 e
1070
1071 e ->
1072 {:error, e}
1073 end
1074 end
1075
1076 defp maybe_update_follow_information(data) do
1077 with {:enabled, true} <-
1078 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1079 {:ok, info} <- fetch_follow_information_for_user(data) do
1080 info = Map.merge(data.info, info)
1081 Map.put(data, :info, info)
1082 else
1083 {:enabled, false} ->
1084 data
1085
1086 e ->
1087 Logger.error(
1088 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1089 )
1090
1091 data
1092 end
1093 end
1094
1095 defp collection_private(data) do
1096 if is_map(data["first"]) and
1097 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1098 {:ok, false}
1099 else
1100 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1101 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1102 {:ok, false}
1103 else
1104 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1105 {:ok, true}
1106
1107 {:error, _} = e ->
1108 e
1109
1110 e ->
1111 {:error, e}
1112 end
1113 end
1114 end
1115
1116 def user_data_from_user_object(data) do
1117 with {:ok, data} <- MRF.filter(data),
1118 {:ok, data} <- object_to_user_data(data) do
1119 {:ok, data}
1120 else
1121 e -> {:error, e}
1122 end
1123 end
1124
1125 def fetch_and_prepare_user_from_ap_id(ap_id) do
1126 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1127 {:ok, data} <- user_data_from_user_object(data),
1128 data <- maybe_update_follow_information(data) do
1129 {:ok, data}
1130 else
1131 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1132 end
1133 end
1134
1135 def make_user_from_ap_id(ap_id) do
1136 if _user = User.get_cached_by_ap_id(ap_id) do
1137 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1138 else
1139 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1140 User.insert_or_update_user(data)
1141 else
1142 e -> {:error, e}
1143 end
1144 end
1145 end
1146
1147 def make_user_from_nickname(nickname) do
1148 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1149 make_user_from_ap_id(ap_id)
1150 else
1151 _e -> {:error, "No AP id in WebFinger"}
1152 end
1153 end
1154
1155 # filter out broken threads
1156 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1157 entire_thread_visible_for_user?(activity, user)
1158 end
1159
1160 # do post-processing on a specific activity
1161 def contain_activity(%Activity{} = activity, %User{} = user) do
1162 contain_broken_threads(activity, user)
1163 end
1164
1165 def fetch_direct_messages_query do
1166 Activity
1167 |> restrict_type(%{"type" => "Create"})
1168 |> restrict_visibility(%{visibility: "direct"})
1169 |> order_by([activity], asc: activity.id)
1170 end
1171 end