Merge branch 'develop' into update-oauth-template
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Conversation
8 alias Pleroma.Notification
9 alias Pleroma.Object
10 alias Pleroma.Object.Fetcher
11 alias Pleroma.Pagination
12 alias Pleroma.Repo
13 alias Pleroma.Upload
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.MRF
16 alias Pleroma.Web.ActivityPub.Transmogrifier
17 alias Pleroma.Web.WebFinger
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 # For Announce activities, we filter the recipients based on following status for any actors
26 # that match actual users. See issue #164 for more information about why this is necessary.
27 defp get_recipients(%{"type" => "Announce"} = data) do
28 to = data["to"] || []
29 cc = data["cc"] || []
30 actor = User.get_cached_by_ap_id(data["actor"])
31
32 recipients =
33 (to ++ cc)
34 |> Enum.filter(fn recipient ->
35 case User.get_cached_by_ap_id(recipient) do
36 nil ->
37 true
38
39 user ->
40 User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = data["to"] || []
49 cc = data["cc"] || []
50 actor = data["actor"] || []
51 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
52 {recipients, to, cc}
53 end
54
55 defp get_recipients(data) do
56 to = data["to"] || []
57 cc = data["cc"] || []
58 recipients = to ++ cc
59 {recipients, to, cc}
60 end
61
62 defp check_actor_is_active(actor) do
63 if not is_nil(actor) do
64 with user <- User.get_cached_by_ap_id(actor),
65 false <- user.info.deactivated do
66 :ok
67 else
68 _e -> :reject
69 end
70 else
71 :ok
72 end
73 end
74
75 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
76 limit = Pleroma.Config.get([:instance, :remote_limit])
77 String.length(content) <= limit
78 end
79
80 defp check_remote_limit(_), do: true
81
82 def increase_note_count_if_public(actor, object) do
83 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
84 end
85
86 def decrease_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
88 end
89
90 def increase_replies_count_if_reply(%{
91 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 "type" => "Create"
93 }) do
94 if is_public?(object) do
95 Object.increase_replies_count(reply_ap_id)
96 end
97 end
98
99 def increase_replies_count_if_reply(_create_data), do: :noop
100
101 def decrease_replies_count_if_reply(%Object{
102 data: %{"inReplyTo" => reply_ap_id} = object
103 }) do
104 if is_public?(object) do
105 Object.decrease_replies_count(reply_ap_id)
106 end
107 end
108
109 def decrease_replies_count_if_reply(_object), do: :noop
110
111 def increase_poll_votes_if_vote(%{
112 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
113 "type" => "Create"
114 }) do
115 Object.increase_vote_count(reply_ap_id, name)
116 end
117
118 def increase_poll_votes_if_vote(_create_data), do: :noop
119
120 def insert(map, local \\ true, fake \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 :ok <- check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:ok, map, object} <- insert_full_object(map) do
129 {:ok, activity} =
130 Repo.insert(%Activity{
131 data: map,
132 local: local,
133 actor: map["actor"],
134 recipients: recipients
135 })
136
137 # Splice in the child object if we have one.
138 activity =
139 if !is_nil(object) do
140 Map.put(activity, :object, object)
141 else
142 activity
143 end
144
145 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
146
147 Notification.create_notifications(activity)
148
149 participations =
150 activity
151 |> Conversation.create_or_bump_for()
152 |> get_participations()
153
154 stream_out(activity)
155 stream_out_participations(participations)
156 {:ok, activity}
157 else
158 %Activity{} = activity ->
159 {:ok, activity}
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 error ->
174 {:error, error}
175 end
176 end
177
178 defp get_participations({:ok, %{participations: participations}}), do: participations
179 defp get_participations(_), do: []
180
181 def stream_out_participations(participations) do
182 participations =
183 participations
184 |> Repo.preload(:user)
185
186 Enum.each(participations, fn participation ->
187 Pleroma.Web.Streamer.stream("participation", participation)
188 end)
189 end
190
191 def stream_out(activity) do
192 public = "https://www.w3.org/ns/activitystreams#Public"
193
194 if activity.data["type"] in ["Create", "Announce", "Delete"] do
195 object = Object.normalize(activity)
196 # Do not stream out poll replies
197 unless object.data["type"] == "Answer" do
198 Pleroma.Web.Streamer.stream("user", activity)
199 Pleroma.Web.Streamer.stream("list", activity)
200
201 if Enum.member?(activity.data["to"], public) do
202 Pleroma.Web.Streamer.stream("public", activity)
203
204 if activity.local do
205 Pleroma.Web.Streamer.stream("public:local", activity)
206 end
207
208 if activity.data["type"] in ["Create"] do
209 object.data
210 |> Map.get("tag", [])
211 |> Enum.filter(fn tag -> is_bitstring(tag) end)
212 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
213
214 if object.data["attachment"] != [] do
215 Pleroma.Web.Streamer.stream("public:media", activity)
216
217 if activity.local do
218 Pleroma.Web.Streamer.stream("public:local:media", activity)
219 end
220 end
221 end
222 else
223 # TODO: Write test, replace with visibility test
224 if !Enum.member?(activity.data["cc"] || [], public) &&
225 !Enum.member?(
226 activity.data["to"],
227 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
228 ),
229 do: Pleroma.Web.Streamer.stream("direct", activity)
230 end
231 end
232 end
233 end
234
235 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 # Changing note count prior to enqueuing federation task in order to avoid
251 # race conditions on updating user.info
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 :ok <- maybe_federate(activity) do
254 {:ok, activity}
255 else
256 {:fake, true, activity} ->
257 {:ok, activity}
258 end
259 end
260
261 def accept(%{to: to, actor: actor, object: object} = params) do
262 # only accept false as false value
263 local = !(params[:local] == false)
264
265 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
266 {:ok, activity} <- insert(data, local),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity}
269 end
270 end
271
272 def reject(%{to: to, actor: actor, object: object} = params) do
273 # only accept false as false value
274 local = !(params[:local] == false)
275
276 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
277 {:ok, activity} <- insert(data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 end
281 end
282
283 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
284 # only accept false as false value
285 local = !(params[:local] == false)
286
287 with data <- %{
288 "to" => to,
289 "cc" => cc,
290 "type" => "Update",
291 "actor" => actor,
292 "object" => object
293 },
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
296 {:ok, activity}
297 end
298 end
299
300 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
301 def like(
302 %User{ap_id: ap_id} = user,
303 %Object{data: %{"id" => _}} = object,
304 activity_id \\ nil,
305 local \\ true
306 ) do
307 with nil <- get_existing_like(ap_id, object),
308 like_data <- make_like_data(user, object, activity_id),
309 {:ok, activity} <- insert(like_data, local),
310 {:ok, object} <- add_like_to_object(activity, object),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity, object}
313 else
314 %Activity{} = activity -> {:ok, activity, object}
315 error -> {:error, error}
316 end
317 end
318
319 def unlike(
320 %User{} = actor,
321 %Object{} = object,
322 activity_id \\ nil,
323 local \\ true
324 ) do
325 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
326 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
327 {:ok, unlike_activity} <- insert(unlike_data, local),
328 {:ok, _activity} <- Repo.delete(like_activity),
329 {:ok, object} <- remove_like_from_object(like_activity, object),
330 :ok <- maybe_federate(unlike_activity) do
331 {:ok, unlike_activity, like_activity, object}
332 else
333 _e -> {:ok, object}
334 end
335 end
336
337 def announce(
338 %User{ap_id: _} = user,
339 %Object{data: %{"id" => _}} = object,
340 activity_id \\ nil,
341 local \\ true,
342 public \\ true
343 ) do
344 with true <- is_public?(object),
345 announce_data <- make_announce_data(user, object, activity_id, public),
346 {:ok, activity} <- insert(announce_data, local),
347 {:ok, object} <- add_announce_to_object(activity, object),
348 :ok <- maybe_federate(activity) do
349 {:ok, activity, object}
350 else
351 error -> {:error, error}
352 end
353 end
354
355 def unannounce(
356 %User{} = actor,
357 %Object{} = object,
358 activity_id \\ nil,
359 local \\ true
360 ) do
361 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
362 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
363 {:ok, unannounce_activity} <- insert(unannounce_data, local),
364 :ok <- maybe_federate(unannounce_activity),
365 {:ok, _activity} <- Repo.delete(announce_activity),
366 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
367 {:ok, unannounce_activity, object}
368 else
369 _e -> {:ok, object}
370 end
371 end
372
373 def follow(follower, followed, activity_id \\ nil, local \\ true) do
374 with data <- make_follow_data(follower, followed, activity_id),
375 {:ok, activity} <- insert(data, local),
376 :ok <- maybe_federate(activity) do
377 {:ok, activity}
378 end
379 end
380
381 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
382 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
383 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
384 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
385 {:ok, activity} <- insert(unfollow_data, local),
386 :ok <- maybe_federate(activity) do
387 {:ok, activity}
388 end
389 end
390
391 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
392 user = User.get_cached_by_ap_id(actor)
393 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
394
395 with {:ok, object, activity} <- Object.delete(object),
396 data <- %{
397 "type" => "Delete",
398 "actor" => actor,
399 "object" => id,
400 "to" => to,
401 "deleted_activity_id" => activity && activity.id
402 },
403 {:ok, activity} <- insert(data, local),
404 _ <- decrease_replies_count_if_reply(object),
405 # Changing note count prior to enqueuing federation task in order to avoid
406 # race conditions on updating user.info
407 {:ok, _actor} <- decrease_note_count_if_public(user, object),
408 :ok <- maybe_federate(activity) do
409 {:ok, activity}
410 end
411 end
412
413 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
414 outgoing_blocks = Pleroma.Config.get([:activitypub, :outgoing_blocks])
415 unfollow_blocked = Pleroma.Config.get([:activitypub, :unfollow_blocked])
416
417 if unfollow_blocked do
418 follow_activity = fetch_latest_follow(blocker, blocked)
419 if follow_activity, do: unfollow(blocker, blocked, nil, local)
420 end
421
422 with true <- outgoing_blocks,
423 block_data <- make_block_data(blocker, blocked, activity_id),
424 {:ok, activity} <- insert(block_data, local),
425 :ok <- maybe_federate(activity) do
426 {:ok, activity}
427 else
428 _e -> {:ok, nil}
429 end
430 end
431
432 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
433 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
434 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
435 {:ok, activity} <- insert(unblock_data, local),
436 :ok <- maybe_federate(activity) do
437 {:ok, activity}
438 end
439 end
440
441 def flag(
442 %{
443 actor: actor,
444 context: context,
445 account: account,
446 statuses: statuses,
447 content: content
448 } = params
449 ) do
450 # only accept false as false value
451 local = !(params[:local] == false)
452 forward = !(params[:forward] == false)
453
454 additional = params[:additional] || %{}
455
456 params = %{
457 actor: actor,
458 context: context,
459 account: account,
460 statuses: statuses,
461 content: content
462 }
463
464 additional =
465 if forward do
466 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
467 else
468 Map.merge(additional, %{"to" => [], "cc" => []})
469 end
470
471 with flag_data <- make_flag_data(params, additional),
472 {:ok, activity} <- insert(flag_data, local),
473 :ok <- maybe_federate(activity) do
474 Enum.each(User.all_superusers(), fn superuser ->
475 superuser
476 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
477 |> Pleroma.Emails.Mailer.deliver_async()
478 end)
479
480 {:ok, activity}
481 end
482 end
483
484 defp fetch_activities_for_context_query(context, opts) do
485 public = ["https://www.w3.org/ns/activitystreams#Public"]
486
487 recipients =
488 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
489
490 from(activity in Activity)
491 |> maybe_preload_objects(opts)
492 |> restrict_blocked(opts)
493 |> restrict_recipients(recipients, opts["user"])
494 |> where(
495 [activity],
496 fragment(
497 "?->>'type' = ? and ?->>'context' = ?",
498 activity.data,
499 "Create",
500 activity.data,
501 ^context
502 )
503 )
504 |> exclude_poll_votes(opts)
505 |> order_by([activity], desc: activity.id)
506 end
507
508 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
509 def fetch_activities_for_context(context, opts \\ %{}) do
510 context
511 |> fetch_activities_for_context_query(opts)
512 |> Repo.all()
513 end
514
515 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
516 Pleroma.FlakeId.t() | nil
517 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
518 context
519 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
520 |> limit(1)
521 |> select([a], a.id)
522 |> Repo.one()
523 end
524
525 def fetch_public_activities(opts \\ %{}) do
526 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
527
528 q
529 |> restrict_unlisted()
530 |> Pagination.fetch_paginated(opts)
531 |> Enum.reverse()
532 end
533
534 @valid_visibilities ~w[direct unlisted public private]
535
536 defp restrict_visibility(query, %{visibility: visibility})
537 when is_list(visibility) do
538 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
539 query =
540 from(
541 a in query,
542 where:
543 fragment(
544 "activity_visibility(?, ?, ?) = ANY (?)",
545 a.actor,
546 a.recipients,
547 a.data,
548 ^visibility
549 )
550 )
551
552 query
553 else
554 Logger.error("Could not restrict visibility to #{visibility}")
555 end
556 end
557
558 defp restrict_visibility(query, %{visibility: visibility})
559 when visibility in @valid_visibilities do
560 query =
561 from(
562 a in query,
563 where:
564 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
565 )
566
567 query
568 end
569
570 defp restrict_visibility(_query, %{visibility: visibility})
571 when visibility not in @valid_visibilities do
572 Logger.error("Could not restrict visibility to #{visibility}")
573 end
574
575 defp restrict_visibility(query, _visibility), do: query
576
577 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do
578 query =
579 from(
580 a in query,
581 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
582 )
583
584 query
585 end
586
587 defp restrict_thread_visibility(query, _), do: query
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put("type", ["Create", "Announce"])
593 |> Map.put("actor_id", user.ap_id)
594 |> Map.put("whole_db", true)
595 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
596
597 recipients =
598 if reading_user do
599 ["https://www.w3.org/ns/activitystreams#Public"] ++
600 [reading_user.ap_id | reading_user.following]
601 else
602 ["https://www.w3.org/ns/activitystreams#Public"]
603 end
604
605 fetch_activities(recipients, params)
606 |> Enum.reverse()
607 end
608
609 defp restrict_since(query, %{"since_id" => ""}), do: query
610
611 defp restrict_since(query, %{"since_id" => since_id}) do
612 from(activity in query, where: activity.id > ^since_id)
613 end
614
615 defp restrict_since(query, _), do: query
616
617 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
618 raise "Can't use the child object without preloading!"
619 end
620
621 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
622 when is_list(tag_reject) and tag_reject != [] do
623 from(
624 [_activity, object] in query,
625 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
626 )
627 end
628
629 defp restrict_tag_reject(query, _), do: query
630
631 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
632 raise "Can't use the child object without preloading!"
633 end
634
635 defp restrict_tag_all(query, %{"tag_all" => tag_all})
636 when is_list(tag_all) and tag_all != [] do
637 from(
638 [_activity, object] in query,
639 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
640 )
641 end
642
643 defp restrict_tag_all(query, _), do: query
644
645 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
646 raise "Can't use the child object without preloading!"
647 end
648
649 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
650 from(
651 [_activity, object] in query,
652 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
653 )
654 end
655
656 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
657 from(
658 [_activity, object] in query,
659 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
660 )
661 end
662
663 defp restrict_tag(query, _), do: query
664
665 defp restrict_recipients(query, [], _user), do: query
666
667 defp restrict_recipients(query, recipients, nil) do
668 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
669 end
670
671 defp restrict_recipients(query, recipients, user) do
672 from(
673 activity in query,
674 where: fragment("? && ?", ^recipients, activity.recipients),
675 or_where: activity.actor == ^user.ap_id
676 )
677 end
678
679 defp restrict_local(query, %{"local_only" => true}) do
680 from(activity in query, where: activity.local == true)
681 end
682
683 defp restrict_local(query, _), do: query
684
685 defp restrict_actor(query, %{"actor_id" => actor_id}) do
686 from(activity in query, where: activity.actor == ^actor_id)
687 end
688
689 defp restrict_actor(query, _), do: query
690
691 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
692 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
693 end
694
695 defp restrict_type(query, %{"type" => type}) do
696 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
697 end
698
699 defp restrict_type(query, _), do: query
700
701 defp restrict_state(query, %{"state" => state}) do
702 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
703 end
704
705 defp restrict_state(query, _), do: query
706
707 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
708 from(
709 activity in query,
710 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
711 )
712 end
713
714 defp restrict_favorited_by(query, _), do: query
715
716 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
717 raise "Can't use the child object without preloading!"
718 end
719
720 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
721 from(
722 [_activity, object] in query,
723 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
724 )
725 end
726
727 defp restrict_media(query, _), do: query
728
729 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
730 from(
731 activity in query,
732 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
733 )
734 end
735
736 defp restrict_replies(query, _), do: query
737
738 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
739 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
740 end
741
742 defp restrict_reblogs(query, _), do: query
743
744 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
745
746 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
747 mutes = info.mutes
748
749 from(
750 activity in query,
751 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
752 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
753 )
754 end
755
756 defp restrict_muted(query, _), do: query
757
758 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
759 blocks = info.blocks || []
760 domain_blocks = info.domain_blocks || []
761
762 query =
763 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
764
765 from(
766 [activity, object: o] in query,
767 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
768 where: fragment("not (? && ?)", activity.recipients, ^blocks),
769 where:
770 fragment(
771 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
772 activity.data,
773 activity.data,
774 ^blocks
775 ),
776 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
777 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
778 )
779 end
780
781 defp restrict_blocked(query, _), do: query
782
783 defp restrict_unlisted(query) do
784 from(
785 activity in query,
786 where:
787 fragment(
788 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
789 activity.data,
790 ^["https://www.w3.org/ns/activitystreams#Public"]
791 )
792 )
793 end
794
795 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
796 from(activity in query, where: activity.id in ^ids)
797 end
798
799 defp restrict_pinned(query, _), do: query
800
801 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
802 muted_reblogs = info.muted_reblogs || []
803
804 from(
805 activity in query,
806 where:
807 fragment(
808 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
809 activity.data,
810 activity.actor,
811 ^muted_reblogs
812 )
813 )
814 end
815
816 defp restrict_muted_reblogs(query, _), do: query
817
818 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
819
820 defp exclude_poll_votes(query, _) do
821 if has_named_binding?(query, :object) do
822 from([activity, object: o] in query,
823 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
824 )
825 else
826 query
827 end
828 end
829
830 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
831
832 defp maybe_preload_objects(query, _) do
833 query
834 |> Activity.with_preloaded_object()
835 end
836
837 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
838
839 defp maybe_preload_bookmarks(query, opts) do
840 query
841 |> Activity.with_preloaded_bookmark(opts["user"])
842 end
843
844 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
845
846 defp maybe_set_thread_muted_field(query, opts) do
847 query
848 |> Activity.with_set_thread_muted_field(opts["user"])
849 end
850
851 defp maybe_order(query, %{order: :desc}) do
852 query
853 |> order_by(desc: :id)
854 end
855
856 defp maybe_order(query, %{order: :asc}) do
857 query
858 |> order_by(asc: :id)
859 end
860
861 defp maybe_order(query, _), do: query
862
863 def fetch_activities_query(recipients, opts \\ %{}) do
864 base_query = from(activity in Activity)
865
866 base_query
867 |> maybe_preload_objects(opts)
868 |> maybe_preload_bookmarks(opts)
869 |> maybe_set_thread_muted_field(opts)
870 |> maybe_order(opts)
871 |> restrict_recipients(recipients, opts["user"])
872 |> restrict_tag(opts)
873 |> restrict_tag_reject(opts)
874 |> restrict_tag_all(opts)
875 |> restrict_since(opts)
876 |> restrict_local(opts)
877 |> restrict_actor(opts)
878 |> restrict_type(opts)
879 |> restrict_state(opts)
880 |> restrict_favorited_by(opts)
881 |> restrict_blocked(opts)
882 |> restrict_muted(opts)
883 |> restrict_media(opts)
884 |> restrict_visibility(opts)
885 |> restrict_thread_visibility(opts)
886 |> restrict_replies(opts)
887 |> restrict_reblogs(opts)
888 |> restrict_pinned(opts)
889 |> restrict_muted_reblogs(opts)
890 |> Activity.restrict_deactivated_users()
891 |> exclude_poll_votes(opts)
892 end
893
894 def fetch_activities(recipients, opts \\ %{}) do
895 fetch_activities_query(recipients, opts)
896 |> Pagination.fetch_paginated(opts)
897 |> Enum.reverse()
898 end
899
900 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
901 from(activity in query,
902 where:
903 fragment("? && ?", activity.recipients, ^recipients) or
904 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
905 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
906 )
907 end
908
909 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
910 fetch_activities_query([], opts)
911 |> fetch_activities_bounded_query(recipients, recipients_with_public)
912 |> Pagination.fetch_paginated(opts)
913 |> Enum.reverse()
914 end
915
916 def upload(file, opts \\ []) do
917 with {:ok, data} <- Upload.store(file, opts) do
918 obj_data =
919 if opts[:actor] do
920 Map.put(data, "actor", opts[:actor])
921 else
922 data
923 end
924
925 Repo.insert(%Object{data: obj_data})
926 end
927 end
928
929 defp object_to_user_data(data) do
930 avatar =
931 data["icon"]["url"] &&
932 %{
933 "type" => "Image",
934 "url" => [%{"href" => data["icon"]["url"]}]
935 }
936
937 banner =
938 data["image"]["url"] &&
939 %{
940 "type" => "Image",
941 "url" => [%{"href" => data["image"]["url"]}]
942 }
943
944 locked = data["manuallyApprovesFollowers"] || false
945 data = Transmogrifier.maybe_fix_user_object(data)
946
947 user_data = %{
948 ap_id: data["id"],
949 info: %{
950 "ap_enabled" => true,
951 "source_data" => data,
952 "banner" => banner,
953 "locked" => locked
954 },
955 avatar: avatar,
956 name: data["name"],
957 follower_address: data["followers"],
958 bio: data["summary"]
959 }
960
961 # nickname can be nil because of virtual actors
962 user_data =
963 if data["preferredUsername"] do
964 Map.put(
965 user_data,
966 :nickname,
967 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
968 )
969 else
970 Map.put(user_data, :nickname, nil)
971 end
972
973 {:ok, user_data}
974 end
975
976 def user_data_from_user_object(data) do
977 with {:ok, data} <- MRF.filter(data),
978 {:ok, data} <- object_to_user_data(data) do
979 {:ok, data}
980 else
981 e -> {:error, e}
982 end
983 end
984
985 def fetch_and_prepare_user_from_ap_id(ap_id) do
986 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
987 {:ok, data} <- user_data_from_user_object(data) do
988 {:ok, data}
989 else
990 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
991 end
992 end
993
994 def make_user_from_ap_id(ap_id) do
995 if _user = User.get_cached_by_ap_id(ap_id) do
996 Transmogrifier.upgrade_user_from_ap_id(ap_id)
997 else
998 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
999 User.insert_or_update_user(data)
1000 else
1001 e -> {:error, e}
1002 end
1003 end
1004 end
1005
1006 def make_user_from_nickname(nickname) do
1007 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1008 make_user_from_ap_id(ap_id)
1009 else
1010 _e -> {:error, "No AP id in WebFinger"}
1011 end
1012 end
1013
1014 # filter out broken threads
1015 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1016 entire_thread_visible_for_user?(activity, user)
1017 end
1018
1019 # do post-processing on a specific activity
1020 def contain_activity(%Activity{} = activity, %User{} = user) do
1021 contain_broken_threads(activity, user)
1022 end
1023
1024 def fetch_direct_messages_query do
1025 Activity
1026 |> restrict_type(%{"type" => "Create"})
1027 |> restrict_visibility(%{visibility: "direct"})
1028 |> order_by([activity], asc: activity.id)
1029 end
1030 end