Merge branch 'develop' into issue/941
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 Repo.insert(%Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 })
137
138 # Splice in the child object if we have one.
139 activity =
140 if !is_nil(object) do
141 Map.put(activity, :object, object)
142 else
143 activity
144 end
145
146 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
147
148 Notification.create_notifications(activity)
149
150 participations =
151 activity
152 |> Conversation.create_or_bump_for()
153 |> get_participations()
154
155 stream_out(activity)
156 stream_out_participations(participations)
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 defp get_participations({:ok, %{participations: participations}}), do: participations
180 defp get_participations(_), do: []
181
182 def stream_out_participations(participations) do
183 participations =
184 participations
185 |> Repo.preload(:user)
186
187 Enum.each(participations, fn participation ->
188 Pleroma.Web.Streamer.stream("participation", participation)
189 end)
190 end
191
192 def stream_out(activity) do
193 public = "https://www.w3.org/ns/activitystreams#Public"
194
195 if activity.data["type"] in ["Create", "Announce", "Delete"] do
196 object = Object.normalize(activity)
197 # Do not stream out poll replies
198 unless object.data["type"] == "Answer" do
199 Pleroma.Web.Streamer.stream("user", activity)
200 Pleroma.Web.Streamer.stream("list", activity)
201
202 if Enum.member?(activity.data["to"], public) do
203 Pleroma.Web.Streamer.stream("public", activity)
204
205 if activity.local do
206 Pleroma.Web.Streamer.stream("public:local", activity)
207 end
208
209 if activity.data["type"] in ["Create"] do
210 object.data
211 |> Map.get("tag", [])
212 |> Enum.filter(fn tag -> is_bitstring(tag) end)
213 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
214
215 if object.data["attachment"] != [] do
216 Pleroma.Web.Streamer.stream("public:media", activity)
217
218 if activity.local do
219 Pleroma.Web.Streamer.stream("public:local:media", activity)
220 end
221 end
222 end
223 else
224 # TODO: Write test, replace with visibility test
225 if !Enum.member?(activity.data["cc"] || [], public) &&
226 !Enum.member?(
227 activity.data["to"],
228 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
229 ),
230 do: Pleroma.Web.Streamer.stream("direct", activity)
231 end
232 end
233 end
234 end
235
236 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
237 additional = params[:additional] || %{}
238 # only accept false as false value
239 local = !(params[:local] == false)
240 published = params[:published]
241
242 with create_data <-
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 ),
247 {:ok, activity} <- insert(create_data, local, fake),
248 {:fake, false, activity} <- {:fake, fake, activity},
249 _ <- increase_replies_count_if_reply(create_data),
250 _ <- increase_poll_votes_if_vote(create_data),
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:fake, true, activity} ->
258 {:ok, activity}
259 end
260 end
261
262 def accept(%{to: to, actor: actor, object: object} = params) do
263 # only accept false as false value
264 local = !(params[:local] == false)
265
266 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
267 {:ok, activity} <- insert(data, local),
268 :ok <- maybe_federate(activity) do
269 {:ok, activity}
270 end
271 end
272
273 def reject(%{to: to, actor: actor, object: object} = params) do
274 # only accept false as false value
275 local = !(params[:local] == false)
276
277 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
278 {:ok, activity} <- insert(data, local),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 end
282 end
283
284 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
285 # only accept false as false value
286 local = !(params[:local] == false)
287
288 with data <- %{
289 "to" => to,
290 "cc" => cc,
291 "type" => "Update",
292 "actor" => actor,
293 "object" => object
294 },
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
302 def like(
303 %User{ap_id: ap_id} = user,
304 %Object{data: %{"id" => _}} = object,
305 activity_id \\ nil,
306 local \\ true
307 ) do
308 with nil <- get_existing_like(ap_id, object),
309 like_data <- make_like_data(user, object, activity_id),
310 {:ok, activity} <- insert(like_data, local),
311 {:ok, object} <- add_like_to_object(activity, object),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity, object}
314 else
315 %Activity{} = activity -> {:ok, activity, object}
316 error -> {:error, error}
317 end
318 end
319
320 def unlike(
321 %User{} = actor,
322 %Object{} = object,
323 activity_id \\ nil,
324 local \\ true
325 ) do
326 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
327 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
328 {:ok, unlike_activity} <- insert(unlike_data, local),
329 {:ok, _activity} <- Repo.delete(like_activity),
330 {:ok, object} <- remove_like_from_object(like_activity, object),
331 :ok <- maybe_federate(unlike_activity) do
332 {:ok, unlike_activity, like_activity, object}
333 else
334 _e -> {:ok, object}
335 end
336 end
337
338 def announce(
339 %User{ap_id: _} = user,
340 %Object{data: %{"id" => _}} = object,
341 activity_id \\ nil,
342 local \\ true,
343 public \\ true
344 ) do
345 with true <- is_public?(object),
346 announce_data <- make_announce_data(user, object, activity_id, public),
347 {:ok, activity} <- insert(announce_data, local),
348 {:ok, object} <- add_announce_to_object(activity, object),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity, object}
351 else
352 error -> {:error, error}
353 end
354 end
355
356 def unannounce(
357 %User{} = actor,
358 %Object{} = object,
359 activity_id \\ nil,
360 local \\ true
361 ) do
362 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
363 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
364 {:ok, unannounce_activity} <- insert(unannounce_data, local),
365 :ok <- maybe_federate(unannounce_activity),
366 {:ok, _activity} <- Repo.delete(announce_activity),
367 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
368 {:ok, unannounce_activity, object}
369 else
370 _e -> {:ok, object}
371 end
372 end
373
374 def follow(follower, followed, activity_id \\ nil, local \\ true) do
375 with data <- make_follow_data(follower, followed, activity_id),
376 {:ok, activity} <- insert(data, local),
377 :ok <- maybe_federate(activity) do
378 {:ok, activity}
379 end
380 end
381
382 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
383 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
384 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
385 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
386 {:ok, activity} <- insert(unfollow_data, local),
387 :ok <- maybe_federate(activity) do
388 {:ok, activity}
389 end
390 end
391
392 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
393 user = User.get_cached_by_ap_id(actor)
394 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
395
396 with {:ok, object, activity} <- Object.delete(object),
397 data <- %{
398 "type" => "Delete",
399 "actor" => actor,
400 "object" => id,
401 "to" => to,
402 "deleted_activity_id" => activity && activity.id
403 },
404 {:ok, activity} <- insert(data, local),
405 _ <- decrease_replies_count_if_reply(object),
406 # Changing note count prior to enqueuing federation task in order to avoid
407 # race conditions on updating user.info
408 {:ok, _actor} <- decrease_note_count_if_public(user, object),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
415 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
416 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
417
418 if unfollow_blocked do
419 follow_activity = fetch_latest_follow(blocker, blocked)
420 if follow_activity, do: unfollow(blocker, blocked, nil, local)
421 end
422
423 with true <- outgoing_blocks,
424 block_data <- make_block_data(blocker, blocked, activity_id),
425 {:ok, activity} <- insert(block_data, local),
426 :ok <- maybe_federate(activity) do
427 {:ok, activity}
428 else
429 _e -> {:ok, nil}
430 end
431 end
432
433 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
434 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
435 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
436 {:ok, activity} <- insert(unblock_data, local),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def flag(
443 %{
444 actor: actor,
445 context: context,
446 account: account,
447 statuses: statuses,
448 content: content
449 } = params
450 ) do
451 # only accept false as false value
452 local = !(params[:local] == false)
453 forward = !(params[:forward] == false)
454
455 additional = params[:additional] || %{}
456
457 params = %{
458 actor: actor,
459 context: context,
460 account: account,
461 statuses: statuses,
462 content: content
463 }
464
465 additional =
466 if forward do
467 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
468 else
469 Map.merge(additional, %{"to" => [], "cc" => []})
470 end
471
472 with flag_data <- make_flag_data(params, additional),
473 {:ok, activity} <- insert(flag_data, local),
474 :ok <- maybe_federate(activity) do
475 Enum.each(User.all_superusers(), fn superuser ->
476 superuser
477 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
478 |> Pleroma.Emails.Mailer.deliver_async()
479 end)
480
481 {:ok, activity}
482 end
483 end
484
485 defp fetch_activities_for_context_query(context, opts) do
486 public = ["https://www.w3.org/ns/activitystreams#Public"]
487
488 recipients =
489 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
490
491 from(activity in Activity)
492 |> maybe_preload_objects(opts)
493 |> restrict_blocked(opts)
494 |> restrict_recipients(recipients, opts["user"])
495 |> where(
496 [activity],
497 fragment(
498 "?->>'type' = ? and ?->>'context' = ?",
499 activity.data,
500 "Create",
501 activity.data,
502 ^context
503 )
504 )
505 |> exclude_poll_votes(opts)
506 |> order_by([activity], desc: activity.id)
507 end
508
509 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
510 def fetch_activities_for_context(context, opts \\ %{}) do
511 context
512 |> fetch_activities_for_context_query(opts)
513 |> Repo.all()
514 end
515
516 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
517 Pleroma.FlakeId.t() | nil
518 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
519 context
520 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
521 |> limit(1)
522 |> select([a], a.id)
523 |> Repo.one()
524 end
525
526 def fetch_public_activities(opts \\ %{}) do
527 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
528
529 q
530 |> restrict_unlisted()
531 |> Pagination.fetch_paginated(opts)
532 |> Enum.reverse()
533 end
534
535 @valid_visibilities ~w[direct unlisted public private]
536
537 defp restrict_visibility(query, %{visibility: visibility})
538 when is_list(visibility) do
539 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
540 query =
541 from(
542 a in query,
543 where:
544 fragment(
545 "activity_visibility(?, ?, ?) = ANY (?)",
546 a.actor,
547 a.recipients,
548 a.data,
549 ^visibility
550 )
551 )
552
553 query
554 else
555 Logger.error("Could not restrict visibility to #{visibility}")
556 end
557 end
558
559 defp restrict_visibility(query, %{visibility: visibility})
560 when visibility in @valid_visibilities do
561 from(
562 a in query,
563 where:
564 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
565 )
566 end
567
568 defp restrict_visibility(_query, %{visibility: visibility})
569 when visibility not in @valid_visibilities do
570 Logger.error("Could not restrict visibility to #{visibility}")
571 end
572
573 defp restrict_visibility(query, _visibility), do: query
574
575 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
576 do: query
577
578 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
579 from(
580 a in query,
581 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
582 )
583 end
584
585 defp restrict_thread_visibility(query, _, _), do: query
586
587 def fetch_user_activities(user, reading_user, params \\ %{}) do
588 params =
589 params
590 |> Map.put("type", ["Create", "Announce"])
591 |> Map.put("actor_id", user.ap_id)
592 |> Map.put("whole_db", true)
593 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
594
595 recipients =
596 if reading_user do
597 ["https://www.w3.org/ns/activitystreams#Public"] ++
598 [reading_user.ap_id | reading_user.following]
599 else
600 ["https://www.w3.org/ns/activitystreams#Public"]
601 end
602
603 fetch_activities(recipients, params)
604 |> Enum.reverse()
605 end
606
607 defp restrict_since(query, %{"since_id" => ""}), do: query
608
609 defp restrict_since(query, %{"since_id" => since_id}) do
610 from(activity in query, where: activity.id > ^since_id)
611 end
612
613 defp restrict_since(query, _), do: query
614
615 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
616 raise "Can't use the child object without preloading!"
617 end
618
619 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
620 when is_list(tag_reject) and tag_reject != [] do
621 from(
622 [_activity, object] in query,
623 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
624 )
625 end
626
627 defp restrict_tag_reject(query, _), do: query
628
629 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
630 raise "Can't use the child object without preloading!"
631 end
632
633 defp restrict_tag_all(query, %{"tag_all" => tag_all})
634 when is_list(tag_all) and tag_all != [] do
635 from(
636 [_activity, object] in query,
637 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
638 )
639 end
640
641 defp restrict_tag_all(query, _), do: query
642
643 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
644 raise "Can't use the child object without preloading!"
645 end
646
647 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
648 from(
649 [_activity, object] in query,
650 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
651 )
652 end
653
654 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
655 from(
656 [_activity, object] in query,
657 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
658 )
659 end
660
661 defp restrict_tag(query, _), do: query
662
663 defp restrict_recipients(query, [], _user), do: query
664
665 defp restrict_recipients(query, recipients, nil) do
666 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
667 end
668
669 defp restrict_recipients(query, recipients, user) do
670 from(
671 activity in query,
672 where: fragment("? && ?", ^recipients, activity.recipients),
673 or_where: activity.actor == ^user.ap_id
674 )
675 end
676
677 defp restrict_local(query, %{"local_only" => true}) do
678 from(activity in query, where: activity.local == true)
679 end
680
681 defp restrict_local(query, _), do: query
682
683 defp restrict_actor(query, %{"actor_id" => actor_id}) do
684 from(activity in query, where: activity.actor == ^actor_id)
685 end
686
687 defp restrict_actor(query, _), do: query
688
689 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
690 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
691 end
692
693 defp restrict_type(query, %{"type" => type}) do
694 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
695 end
696
697 defp restrict_type(query, _), do: query
698
699 defp restrict_state(query, %{"state" => state}) do
700 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
701 end
702
703 defp restrict_state(query, _), do: query
704
705 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
706 from(
707 activity in query,
708 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
709 )
710 end
711
712 defp restrict_favorited_by(query, _), do: query
713
714 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
715 raise "Can't use the child object without preloading!"
716 end
717
718 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
719 from(
720 [_activity, object] in query,
721 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
722 )
723 end
724
725 defp restrict_media(query, _), do: query
726
727 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
728 from(
729 activity in query,
730 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
731 )
732 end
733
734 defp restrict_replies(query, _), do: query
735
736 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
737 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
738 end
739
740 defp restrict_reblogs(query, _), do: query
741
742 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
743
744 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
745 mutes = info.mutes
746
747 from(
748 activity in query,
749 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
750 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
751 )
752 end
753
754 defp restrict_muted(query, _), do: query
755
756 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
757 blocks = info.blocks || []
758 domain_blocks = info.domain_blocks || []
759
760 query =
761 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
762
763 from(
764 [activity, object: o] in query,
765 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
766 where: fragment("not (? && ?)", activity.recipients, ^blocks),
767 where:
768 fragment(
769 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
770 activity.data,
771 activity.data,
772 ^blocks
773 ),
774 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
775 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
776 )
777 end
778
779 defp restrict_blocked(query, _), do: query
780
781 defp restrict_unlisted(query) do
782 from(
783 activity in query,
784 where:
785 fragment(
786 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
787 activity.data,
788 ^["https://www.w3.org/ns/activitystreams#Public"]
789 )
790 )
791 end
792
793 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
794 from(activity in query, where: activity.id in ^ids)
795 end
796
797 defp restrict_pinned(query, _), do: query
798
799 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
800 muted_reblogs = info.muted_reblogs || []
801
802 from(
803 activity in query,
804 where:
805 fragment(
806 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
807 activity.data,
808 activity.actor,
809 ^muted_reblogs
810 )
811 )
812 end
813
814 defp restrict_muted_reblogs(query, _), do: query
815
816 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
817
818 defp exclude_poll_votes(query, _) do
819 if has_named_binding?(query, :object) do
820 from([activity, object: o] in query,
821 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
822 )
823 else
824 query
825 end
826 end
827
828 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
829
830 defp maybe_preload_objects(query, _) do
831 query
832 |> Activity.with_preloaded_object()
833 end
834
835 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
836
837 defp maybe_preload_bookmarks(query, opts) do
838 query
839 |> Activity.with_preloaded_bookmark(opts["user"])
840 end
841
842 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
843
844 defp maybe_set_thread_muted_field(query, opts) do
845 query
846 |> Activity.with_set_thread_muted_field(opts["user"])
847 end
848
849 defp maybe_order(query, %{order: :desc}) do
850 query
851 |> order_by(desc: :id)
852 end
853
854 defp maybe_order(query, %{order: :asc}) do
855 query
856 |> order_by(asc: :id)
857 end
858
859 defp maybe_order(query, _), do: query
860
861 def fetch_activities_query(recipients, opts \\ %{}) do
862 base_query = from(activity in Activity)
863 config = Enum.into(Config.get([:instance]), %{})
864
865 base_query
866 |> maybe_preload_objects(opts)
867 |> maybe_preload_bookmarks(opts)
868 |> maybe_set_thread_muted_field(opts)
869 |> maybe_order(opts)
870 |> restrict_recipients(recipients, opts["user"])
871 |> restrict_tag(opts)
872 |> restrict_tag_reject(opts)
873 |> restrict_tag_all(opts)
874 |> restrict_since(opts)
875 |> restrict_local(opts)
876 |> restrict_actor(opts)
877 |> restrict_type(opts)
878 |> restrict_state(opts)
879 |> restrict_favorited_by(opts)
880 |> restrict_blocked(opts)
881 |> restrict_muted(opts)
882 |> restrict_media(opts)
883 |> restrict_visibility(opts)
884 |> restrict_thread_visibility(opts, config)
885 |> restrict_replies(opts)
886 |> restrict_reblogs(opts)
887 |> restrict_pinned(opts)
888 |> restrict_muted_reblogs(opts)
889 |> Activity.restrict_deactivated_users()
890 |> exclude_poll_votes(opts)
891 end
892
893 def fetch_activities(recipients, opts \\ %{}) do
894 fetch_activities_query(recipients, opts)
895 |> Pagination.fetch_paginated(opts)
896 |> Enum.reverse()
897 end
898
899 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
900 from(activity in query,
901 where:
902 fragment("? && ?", activity.recipients, ^recipients) or
903 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
904 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
905 )
906 end
907
908 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
909 fetch_activities_query([], opts)
910 |> fetch_activities_bounded_query(recipients, recipients_with_public)
911 |> Pagination.fetch_paginated(opts)
912 |> Enum.reverse()
913 end
914
915 def upload(file, opts \\ []) do
916 with {:ok, data} <- Upload.store(file, opts) do
917 obj_data =
918 if opts[:actor] do
919 Map.put(data, "actor", opts[:actor])
920 else
921 data
922 end
923
924 Repo.insert(%Object{data: obj_data})
925 end
926 end
927
928 defp object_to_user_data(data) do
929 avatar =
930 data["icon"]["url"] &&
931 %{
932 "type" => "Image",
933 "url" => [%{"href" => data["icon"]["url"]}]
934 }
935
936 banner =
937 data["image"]["url"] &&
938 %{
939 "type" => "Image",
940 "url" => [%{"href" => data["image"]["url"]}]
941 }
942
943 locked = data["manuallyApprovesFollowers"] || false
944 data = Transmogrifier.maybe_fix_user_object(data)
945
946 user_data = %{
947 ap_id: data["id"],
948 info: %{
949 "ap_enabled" => true,
950 "source_data" => data,
951 "banner" => banner,
952 "locked" => locked
953 },
954 avatar: avatar,
955 name: data["name"],
956 follower_address: data["followers"],
957 bio: data["summary"]
958 }
959
960 # nickname can be nil because of virtual actors
961 user_data =
962 if data["preferredUsername"] do
963 Map.put(
964 user_data,
965 :nickname,
966 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
967 )
968 else
969 Map.put(user_data, :nickname, nil)
970 end
971
972 {:ok, user_data}
973 end
974
975 def user_data_from_user_object(data) do
976 with {:ok, data} <- MRF.filter(data),
977 {:ok, data} <- object_to_user_data(data) do
978 {:ok, data}
979 else
980 e -> {:error, e}
981 end
982 end
983
984 def fetch_and_prepare_user_from_ap_id(ap_id) do
985 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
986 {:ok, data} <- user_data_from_user_object(data) do
987 {:ok, data}
988 else
989 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
990 end
991 end
992
993 def make_user_from_ap_id(ap_id) do
994 if _user = User.get_cached_by_ap_id(ap_id) do
995 Transmogrifier.upgrade_user_from_ap_id(ap_id)
996 else
997 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
998 User.insert_or_update_user(data)
999 else
1000 e -> {:error, e}
1001 end
1002 end
1003 end
1004
1005 def make_user_from_nickname(nickname) do
1006 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1007 make_user_from_ap_id(ap_id)
1008 else
1009 _e -> {:error, "No AP id in WebFinger"}
1010 end
1011 end
1012
1013 # filter out broken threads
1014 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1015 entire_thread_visible_for_user?(activity, user)
1016 end
1017
1018 # do post-processing on a specific activity
1019 def contain_activity(%Activity{} = activity, %User{} = user) do
1020 contain_broken_threads(activity, user)
1021 end
1022
1023 def fetch_direct_messages_query do
1024 Activity
1025 |> restrict_type(%{"type" => "Create"})
1026 |> restrict_visibility(%{visibility: "direct"})
1027 |> order_by([activity], asc: activity.id)
1028 end
1029 end