Change the order of preloading when fetching activities for context
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Conversation
8 alias Pleroma.Notification
9 alias Pleroma.Object
10 alias Pleroma.Object.Fetcher
11 alias Pleroma.Pagination
12 alias Pleroma.Repo
13 alias Pleroma.Upload
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.MRF
16 alias Pleroma.Web.ActivityPub.Transmogrifier
17 alias Pleroma.Web.WebFinger
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 # For Announce activities, we filter the recipients based on following status for any actors
26 # that match actual users. See issue #164 for more information about why this is necessary.
27 defp get_recipients(%{"type" => "Announce"} = data) do
28 to = data["to"] || []
29 cc = data["cc"] || []
30 actor = User.get_cached_by_ap_id(data["actor"])
31
32 recipients =
33 (to ++ cc)
34 |> Enum.filter(fn recipient ->
35 case User.get_cached_by_ap_id(recipient) do
36 nil ->
37 true
38
39 user ->
40 User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = data["to"] || []
49 cc = data["cc"] || []
50 actor = data["actor"] || []
51 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
52 {recipients, to, cc}
53 end
54
55 defp get_recipients(data) do
56 to = data["to"] || []
57 cc = data["cc"] || []
58 recipients = to ++ cc
59 {recipients, to, cc}
60 end
61
62 defp check_actor_is_active(actor) do
63 if not is_nil(actor) do
64 with user <- User.get_cached_by_ap_id(actor),
65 false <- user.info.deactivated do
66 :ok
67 else
68 _e -> :reject
69 end
70 else
71 :ok
72 end
73 end
74
75 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
76 limit = Pleroma.Config.get([:instance, :remote_limit])
77 String.length(content) <= limit
78 end
79
80 defp check_remote_limit(_), do: true
81
82 def increase_note_count_if_public(actor, object) do
83 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
84 end
85
86 def decrease_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
88 end
89
90 def increase_replies_count_if_reply(%{
91 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 "type" => "Create"
93 }) do
94 if is_public?(object) do
95 Object.increase_replies_count(reply_ap_id)
96 end
97 end
98
99 def increase_replies_count_if_reply(_create_data), do: :noop
100
101 def decrease_replies_count_if_reply(%Object{
102 data: %{"inReplyTo" => reply_ap_id} = object
103 }) do
104 if is_public?(object) do
105 Object.decrease_replies_count(reply_ap_id)
106 end
107 end
108
109 def decrease_replies_count_if_reply(_object), do: :noop
110
111 def increase_poll_votes_if_vote(%{
112 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
113 "type" => "Create"
114 }) do
115 Object.increase_vote_count(reply_ap_id, name)
116 end
117
118 def increase_poll_votes_if_vote(_create_data), do: :noop
119
120 def insert(map, local \\ true, fake \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 :ok <- check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:ok, map, object} <- insert_full_object(map) do
129 {:ok, activity} =
130 Repo.insert(%Activity{
131 data: map,
132 local: local,
133 actor: map["actor"],
134 recipients: recipients
135 })
136
137 # Splice in the child object if we have one.
138 activity =
139 if !is_nil(object) do
140 Map.put(activity, :object, object)
141 else
142 activity
143 end
144
145 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
146
147 Notification.create_notifications(activity)
148
149 participations =
150 activity
151 |> Conversation.create_or_bump_for()
152 |> get_participations()
153
154 stream_out(activity)
155 stream_out_participations(participations)
156 {:ok, activity}
157 else
158 %Activity{} = activity ->
159 {:ok, activity}
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 error ->
174 {:error, error}
175 end
176 end
177
178 defp get_participations({:ok, %{participations: participations}}), do: participations
179 defp get_participations(_), do: []
180
181 def stream_out_participations(participations) do
182 participations =
183 participations
184 |> Repo.preload(:user)
185
186 Enum.each(participations, fn participation ->
187 Pleroma.Web.Streamer.stream("participation", participation)
188 end)
189 end
190
191 def stream_out(activity) do
192 public = "https://www.w3.org/ns/activitystreams#Public"
193
194 if activity.data["type"] in ["Create", "Announce", "Delete"] do
195 object = Object.normalize(activity)
196 # Do not stream out poll replies
197 unless object.data["type"] == "Answer" do
198 Pleroma.Web.Streamer.stream("user", activity)
199 Pleroma.Web.Streamer.stream("list", activity)
200
201 if Enum.member?(activity.data["to"], public) do
202 Pleroma.Web.Streamer.stream("public", activity)
203
204 if activity.local do
205 Pleroma.Web.Streamer.stream("public:local", activity)
206 end
207
208 if activity.data["type"] in ["Create"] do
209 object.data
210 |> Map.get("tag", [])
211 |> Enum.filter(fn tag -> is_bitstring(tag) end)
212 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
213
214 if object.data["attachment"] != [] do
215 Pleroma.Web.Streamer.stream("public:media", activity)
216
217 if activity.local do
218 Pleroma.Web.Streamer.stream("public:local:media", activity)
219 end
220 end
221 end
222 else
223 # TODO: Write test, replace with visibility test
224 if !Enum.member?(activity.data["cc"] || [], public) &&
225 !Enum.member?(
226 activity.data["to"],
227 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
228 ),
229 do: Pleroma.Web.Streamer.stream("direct", activity)
230 end
231 end
232 end
233 end
234
235 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 # Changing note count prior to enqueuing federation task in order to avoid
251 # race conditions on updating user.info
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 :ok <- maybe_federate(activity) do
254 {:ok, activity}
255 else
256 {:fake, true, activity} ->
257 {:ok, activity}
258 end
259 end
260
261 def accept(%{to: to, actor: actor, object: object} = params) do
262 # only accept false as false value
263 local = !(params[:local] == false)
264
265 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
266 {:ok, activity} <- insert(data, local),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity}
269 end
270 end
271
272 def reject(%{to: to, actor: actor, object: object} = params) do
273 # only accept false as false value
274 local = !(params[:local] == false)
275
276 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
277 {:ok, activity} <- insert(data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 end
281 end
282
283 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
284 # only accept false as false value
285 local = !(params[:local] == false)
286
287 with data <- %{
288 "to" => to,
289 "cc" => cc,
290 "type" => "Update",
291 "actor" => actor,
292 "object" => object
293 },
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
296 {:ok, activity}
297 end
298 end
299
300 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
301 def like(
302 %User{ap_id: ap_id} = user,
303 %Object{data: %{"id" => _}} = object,
304 activity_id \\ nil,
305 local \\ true
306 ) do
307 with nil <- get_existing_like(ap_id, object),
308 like_data <- make_like_data(user, object, activity_id),
309 {:ok, activity} <- insert(like_data, local),
310 {:ok, object} <- add_like_to_object(activity, object),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity, object}
313 else
314 %Activity{} = activity -> {:ok, activity, object}
315 error -> {:error, error}
316 end
317 end
318
319 def unlike(
320 %User{} = actor,
321 %Object{} = object,
322 activity_id \\ nil,
323 local \\ true
324 ) do
325 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
326 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
327 {:ok, unlike_activity} <- insert(unlike_data, local),
328 {:ok, _activity} <- Repo.delete(like_activity),
329 {:ok, object} <- remove_like_from_object(like_activity, object),
330 :ok <- maybe_federate(unlike_activity) do
331 {:ok, unlike_activity, like_activity, object}
332 else
333 _e -> {:ok, object}
334 end
335 end
336
337 def announce(
338 %User{ap_id: _} = user,
339 %Object{data: %{"id" => _}} = object,
340 activity_id \\ nil,
341 local \\ true,
342 public \\ true
343 ) do
344 with true <- is_public?(object),
345 announce_data <- make_announce_data(user, object, activity_id, public),
346 {:ok, activity} <- insert(announce_data, local),
347 {:ok, object} <- add_announce_to_object(activity, object),
348 :ok <- maybe_federate(activity) do
349 {:ok, activity, object}
350 else
351 error -> {:error, error}
352 end
353 end
354
355 def unannounce(
356 %User{} = actor,
357 %Object{} = object,
358 activity_id \\ nil,
359 local \\ true
360 ) do
361 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
362 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
363 {:ok, unannounce_activity} <- insert(unannounce_data, local),
364 :ok <- maybe_federate(unannounce_activity),
365 {:ok, _activity} <- Repo.delete(announce_activity),
366 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
367 {:ok, unannounce_activity, object}
368 else
369 _e -> {:ok, object}
370 end
371 end
372
373 def follow(follower, followed, activity_id \\ nil, local \\ true) do
374 with data <- make_follow_data(follower, followed, activity_id),
375 {:ok, activity} <- insert(data, local),
376 :ok <- maybe_federate(activity) do
377 {:ok, activity}
378 end
379 end
380
381 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
382 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
383 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
384 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
385 {:ok, activity} <- insert(unfollow_data, local),
386 :ok <- maybe_federate(activity) do
387 {:ok, activity}
388 end
389 end
390
391 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
392 user = User.get_cached_by_ap_id(actor)
393 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
394
395 with {:ok, object, activity} <- Object.delete(object),
396 data <- %{
397 "type" => "Delete",
398 "actor" => actor,
399 "object" => id,
400 "to" => to,
401 "deleted_activity_id" => activity && activity.id
402 },
403 {:ok, activity} <- insert(data, local),
404 _ <- decrease_replies_count_if_reply(object),
405 # Changing note count prior to enqueuing federation task in order to avoid
406 # race conditions on updating user.info
407 {:ok, _actor} <- decrease_note_count_if_public(user, object),
408 :ok <- maybe_federate(activity) do
409 {:ok, activity}
410 end
411 end
412
413 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
414 ap_config = Application.get_env(:pleroma, :activitypub)
415 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
416 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
417
418 with true <- unfollow_blocked do
419 follow_activity = fetch_latest_follow(blocker, blocked)
420
421 if follow_activity do
422 unfollow(blocker, blocked, nil, local)
423 end
424 end
425
426 with true <- outgoing_blocks,
427 block_data <- make_block_data(blocker, blocked, activity_id),
428 {:ok, activity} <- insert(block_data, local),
429 :ok <- maybe_federate(activity) do
430 {:ok, activity}
431 else
432 _e -> {:ok, nil}
433 end
434 end
435
436 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
437 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
438 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
439 {:ok, activity} <- insert(unblock_data, local),
440 :ok <- maybe_federate(activity) do
441 {:ok, activity}
442 end
443 end
444
445 def flag(
446 %{
447 actor: actor,
448 context: context,
449 account: account,
450 statuses: statuses,
451 content: content
452 } = params
453 ) do
454 # only accept false as false value
455 local = !(params[:local] == false)
456 forward = !(params[:forward] == false)
457
458 additional = params[:additional] || %{}
459
460 params = %{
461 actor: actor,
462 context: context,
463 account: account,
464 statuses: statuses,
465 content: content
466 }
467
468 additional =
469 if forward do
470 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
471 else
472 Map.merge(additional, %{"to" => [], "cc" => []})
473 end
474
475 with flag_data <- make_flag_data(params, additional),
476 {:ok, activity} <- insert(flag_data, local),
477 :ok <- maybe_federate(activity) do
478 Enum.each(User.all_superusers(), fn superuser ->
479 superuser
480 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
481 |> Pleroma.Emails.Mailer.deliver_async()
482 end)
483
484 {:ok, activity}
485 end
486 end
487
488 defp fetch_activities_for_context_query(context, opts) do
489 public = ["https://www.w3.org/ns/activitystreams#Public"]
490
491 recipients =
492 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
493
494 from(activity in Activity)
495 |> maybe_preload_objects(opts)
496 |> exclude_poll_votes(opts)
497 |> restrict_blocked(opts)
498 |> restrict_recipients(recipients, opts["user"])
499 |> where(
500 [activity],
501 fragment(
502 "?->>'type' = ? and ?->>'context' = ?",
503 activity.data,
504 "Create",
505 activity.data,
506 ^context
507 )
508 )
509 |> order_by([activity], desc: activity.id)
510 end
511
512 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
513 def fetch_activities_for_context(context, opts \\ %{}) do
514 context
515 |> fetch_activities_for_context_query(opts)
516 |> Repo.all()
517 end
518
519 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
520 Pleroma.FlakeId.t() | nil
521 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
522 context
523 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
524 |> limit(1)
525 |> select([a], a.id)
526 |> Repo.one()
527 end
528
529 def fetch_public_activities(opts \\ %{}) do
530 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
531
532 q
533 |> restrict_unlisted()
534 |> Pagination.fetch_paginated(opts)
535 |> Enum.reverse()
536 end
537
538 @valid_visibilities ~w[direct unlisted public private]
539
540 defp restrict_visibility(query, %{visibility: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543 query =
544 from(
545 a in query,
546 where:
547 fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555
556 query
557 else
558 Logger.error("Could not restrict visibility to #{visibility}")
559 end
560 end
561
562 defp restrict_visibility(query, %{visibility: visibility})
563 when visibility in @valid_visibilities do
564 query =
565 from(
566 a in query,
567 where:
568 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
569 )
570
571 query
572 end
573
574 defp restrict_visibility(_query, %{visibility: visibility})
575 when visibility not in @valid_visibilities do
576 Logger.error("Could not restrict visibility to #{visibility}")
577 end
578
579 defp restrict_visibility(query, _visibility), do: query
580
581 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do
582 query =
583 from(
584 a in query,
585 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
586 )
587
588 query
589 end
590
591 defp restrict_thread_visibility(query, _), do: query
592
593 def fetch_user_activities(user, reading_user, params \\ %{}) do
594 params =
595 params
596 |> Map.put("type", ["Create", "Announce"])
597 |> Map.put("actor_id", user.ap_id)
598 |> Map.put("whole_db", true)
599 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
600
601 recipients =
602 if reading_user do
603 ["https://www.w3.org/ns/activitystreams#Public"] ++
604 [reading_user.ap_id | reading_user.following]
605 else
606 ["https://www.w3.org/ns/activitystreams#Public"]
607 end
608
609 fetch_activities(recipients, params)
610 |> Enum.reverse()
611 end
612
613 defp restrict_since(query, %{"since_id" => ""}), do: query
614
615 defp restrict_since(query, %{"since_id" => since_id}) do
616 from(activity in query, where: activity.id > ^since_id)
617 end
618
619 defp restrict_since(query, _), do: query
620
621 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
622 raise "Can't use the child object without preloading!"
623 end
624
625 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
626 when is_list(tag_reject) and tag_reject != [] do
627 from(
628 [_activity, object] in query,
629 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
630 )
631 end
632
633 defp restrict_tag_reject(query, _), do: query
634
635 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
636 raise "Can't use the child object without preloading!"
637 end
638
639 defp restrict_tag_all(query, %{"tag_all" => tag_all})
640 when is_list(tag_all) and tag_all != [] do
641 from(
642 [_activity, object] in query,
643 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
644 )
645 end
646
647 defp restrict_tag_all(query, _), do: query
648
649 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
650 raise "Can't use the child object without preloading!"
651 end
652
653 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
654 from(
655 [_activity, object] in query,
656 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
657 )
658 end
659
660 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
661 from(
662 [_activity, object] in query,
663 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
664 )
665 end
666
667 defp restrict_tag(query, _), do: query
668
669 defp restrict_to_cc(query, recipients_to, recipients_cc) do
670 from(
671 activity in query,
672 where:
673 fragment(
674 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
675 activity.data,
676 ^recipients_to,
677 activity.data,
678 ^recipients_cc
679 )
680 )
681 end
682
683 defp restrict_recipients(query, [], _user), do: query
684
685 defp restrict_recipients(query, recipients, nil) do
686 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
687 end
688
689 defp restrict_recipients(query, recipients, user) do
690 from(
691 activity in query,
692 where: fragment("? && ?", ^recipients, activity.recipients),
693 or_where: activity.actor == ^user.ap_id
694 )
695 end
696
697 defp restrict_local(query, %{"local_only" => true}) do
698 from(activity in query, where: activity.local == true)
699 end
700
701 defp restrict_local(query, _), do: query
702
703 defp restrict_actor(query, %{"actor_id" => actor_id}) do
704 from(activity in query, where: activity.actor == ^actor_id)
705 end
706
707 defp restrict_actor(query, _), do: query
708
709 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
710 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
711 end
712
713 defp restrict_type(query, %{"type" => type}) do
714 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
715 end
716
717 defp restrict_type(query, _), do: query
718
719 defp restrict_state(query, %{"state" => state}) do
720 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
721 end
722
723 defp restrict_state(query, _), do: query
724
725 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
726 from(
727 activity in query,
728 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
729 )
730 end
731
732 defp restrict_favorited_by(query, _), do: query
733
734 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
735 raise "Can't use the child object without preloading!"
736 end
737
738 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
739 from(
740 [_activity, object] in query,
741 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
742 )
743 end
744
745 defp restrict_media(query, _), do: query
746
747 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
748 from(
749 activity in query,
750 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
751 )
752 end
753
754 defp restrict_replies(query, _), do: query
755
756 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
757 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
758 end
759
760 defp restrict_reblogs(query, _), do: query
761
762 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
763
764 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
765 mutes = info.mutes
766
767 from(
768 activity in query,
769 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
770 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
771 )
772 end
773
774 defp restrict_muted(query, _), do: query
775
776 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
777 blocks = info.blocks || []
778 domain_blocks = info.domain_blocks || []
779
780 query =
781 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
782
783 from(
784 [activity, object: o] in query,
785 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
786 where: fragment("not (? && ?)", activity.recipients, ^blocks),
787 where:
788 fragment(
789 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
790 activity.data,
791 activity.data,
792 ^blocks
793 ),
794 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
795 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
796 )
797 end
798
799 defp restrict_blocked(query, _), do: query
800
801 defp restrict_unlisted(query) do
802 from(
803 activity in query,
804 where:
805 fragment(
806 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
807 activity.data,
808 ^["https://www.w3.org/ns/activitystreams#Public"]
809 )
810 )
811 end
812
813 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
814 from(activity in query, where: activity.id in ^ids)
815 end
816
817 defp restrict_pinned(query, _), do: query
818
819 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
820 muted_reblogs = info.muted_reblogs || []
821
822 from(
823 activity in query,
824 where:
825 fragment(
826 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
827 activity.data,
828 activity.actor,
829 ^muted_reblogs
830 )
831 )
832 end
833
834 defp restrict_muted_reblogs(query, _), do: query
835
836 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
837
838 defp exclude_poll_votes(query, _) do
839 if has_named_binding?(query, :object) do
840 from([activity, object: o] in query,
841 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
842 )
843 else
844 query
845 end
846 end
847
848 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
849
850 defp maybe_preload_objects(query, _) do
851 query
852 |> Activity.with_preloaded_object()
853 end
854
855 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
856
857 defp maybe_preload_bookmarks(query, opts) do
858 query
859 |> Activity.with_preloaded_bookmark(opts["user"])
860 end
861
862 defp maybe_order(query, %{order: :desc}) do
863 query
864 |> order_by(desc: :id)
865 end
866
867 defp maybe_order(query, %{order: :asc}) do
868 query
869 |> order_by(asc: :id)
870 end
871
872 defp maybe_order(query, _), do: query
873
874 def fetch_activities_query(recipients, opts \\ %{}) do
875 base_query = from(activity in Activity)
876
877 base_query
878 |> maybe_preload_objects(opts)
879 |> maybe_preload_bookmarks(opts)
880 |> maybe_order(opts)
881 |> exclude_poll_votes(opts)
882 |> restrict_recipients(recipients, opts["user"])
883 |> restrict_tag(opts)
884 |> restrict_tag_reject(opts)
885 |> restrict_tag_all(opts)
886 |> restrict_since(opts)
887 |> restrict_local(opts)
888 |> restrict_actor(opts)
889 |> restrict_type(opts)
890 |> restrict_state(opts)
891 |> restrict_favorited_by(opts)
892 |> restrict_blocked(opts)
893 |> restrict_muted(opts)
894 |> restrict_media(opts)
895 |> restrict_visibility(opts)
896 |> restrict_thread_visibility(opts)
897 |> restrict_replies(opts)
898 |> restrict_reblogs(opts)
899 |> restrict_pinned(opts)
900 |> restrict_muted_reblogs(opts)
901 |> Activity.restrict_deactivated_users()
902 end
903
904 def fetch_activities(recipients, opts \\ %{}) do
905 fetch_activities_query(recipients, opts)
906 |> Pagination.fetch_paginated(opts)
907 |> Enum.reverse()
908 end
909
910 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
911 fetch_activities_query([], opts)
912 |> restrict_to_cc(recipients_to, recipients_cc)
913 |> Pagination.fetch_paginated(opts)
914 |> Enum.reverse()
915 end
916
917 def upload(file, opts \\ []) do
918 with {:ok, data} <- Upload.store(file, opts) do
919 obj_data =
920 if opts[:actor] do
921 Map.put(data, "actor", opts[:actor])
922 else
923 data
924 end
925
926 Repo.insert(%Object{data: obj_data})
927 end
928 end
929
930 def user_data_from_user_object(data) do
931 avatar =
932 data["icon"]["url"] &&
933 %{
934 "type" => "Image",
935 "url" => [%{"href" => data["icon"]["url"]}]
936 }
937
938 banner =
939 data["image"]["url"] &&
940 %{
941 "type" => "Image",
942 "url" => [%{"href" => data["image"]["url"]}]
943 }
944
945 locked = data["manuallyApprovesFollowers"] || false
946 data = Transmogrifier.maybe_fix_user_object(data)
947
948 user_data = %{
949 ap_id: data["id"],
950 info: %{
951 "ap_enabled" => true,
952 "source_data" => data,
953 "banner" => banner,
954 "locked" => locked
955 },
956 avatar: avatar,
957 name: data["name"],
958 follower_address: data["followers"],
959 bio: data["summary"]
960 }
961
962 # nickname can be nil because of virtual actors
963 user_data =
964 if data["preferredUsername"] do
965 Map.put(
966 user_data,
967 :nickname,
968 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
969 )
970 else
971 Map.put(user_data, :nickname, nil)
972 end
973
974 {:ok, user_data}
975 end
976
977 def fetch_and_prepare_user_from_ap_id(ap_id) do
978 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
979 user_data_from_user_object(data)
980 else
981 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
982 end
983 end
984
985 def make_user_from_ap_id(ap_id) do
986 if _user = User.get_cached_by_ap_id(ap_id) do
987 Transmogrifier.upgrade_user_from_ap_id(ap_id)
988 else
989 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
990 User.insert_or_update_user(data)
991 else
992 e -> {:error, e}
993 end
994 end
995 end
996
997 def make_user_from_nickname(nickname) do
998 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
999 make_user_from_ap_id(ap_id)
1000 else
1001 _e -> {:error, "No AP id in WebFinger"}
1002 end
1003 end
1004
1005 # filter out broken threads
1006 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1007 entire_thread_visible_for_user?(activity, user)
1008 end
1009
1010 # do post-processing on a specific activity
1011 def contain_activity(%Activity{} = activity, %User{} = user) do
1012 contain_broken_threads(activity, user)
1013 end
1014
1015 def fetch_direct_messages_query do
1016 Activity
1017 |> restrict_type(%{"type" => "Create"})
1018 |> restrict_visibility(%{visibility: "direct"})
1019 |> order_by([activity], asc: activity.id)
1020 end
1021 end