c0e3d1478794622ccd9d633bc04807fac9faed02
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 Repo.insert(%Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 })
137
138 # Splice in the child object if we have one.
139 activity =
140 if !is_nil(object) do
141 Map.put(activity, :object, object)
142 else
143 activity
144 end
145
146 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
147
148 Notification.create_notifications(activity)
149
150 participations =
151 activity
152 |> Conversation.create_or_bump_for()
153 |> get_participations()
154
155 stream_out(activity)
156 stream_out_participations(participations)
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 defp get_participations({:ok, %{participations: participations}}), do: participations
180 defp get_participations(_), do: []
181
182 def stream_out_participations(participations) do
183 participations =
184 participations
185 |> Repo.preload(:user)
186
187 Enum.each(participations, fn participation ->
188 Pleroma.Web.Streamer.stream("participation", participation)
189 end)
190 end
191
192 def stream_out(activity) do
193 public = "https://www.w3.org/ns/activitystreams#Public"
194
195 if activity.data["type"] in ["Create", "Announce", "Delete"] do
196 object = Object.normalize(activity)
197 # Do not stream out poll replies
198 unless object.data["type"] == "Answer" do
199 Pleroma.Web.Streamer.stream("user", activity)
200 Pleroma.Web.Streamer.stream("list", activity)
201
202 if Enum.member?(activity.data["to"], public) do
203 Pleroma.Web.Streamer.stream("public", activity)
204
205 if activity.local do
206 Pleroma.Web.Streamer.stream("public:local", activity)
207 end
208
209 if activity.data["type"] in ["Create"] do
210 object.data
211 |> Map.get("tag", [])
212 |> Enum.filter(fn tag -> is_bitstring(tag) end)
213 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
214
215 if object.data["attachment"] != [] do
216 Pleroma.Web.Streamer.stream("public:media", activity)
217
218 if activity.local do
219 Pleroma.Web.Streamer.stream("public:local:media", activity)
220 end
221 end
222 end
223 else
224 # TODO: Write test, replace with visibility test
225 if !Enum.member?(activity.data["cc"] || [], public) &&
226 !Enum.member?(
227 activity.data["to"],
228 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
229 ),
230 do: Pleroma.Web.Streamer.stream("direct", activity)
231 end
232 end
233 end
234 end
235
236 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
237 additional = params[:additional] || %{}
238 # only accept false as false value
239 local = !(params[:local] == false)
240 published = params[:published]
241
242 with create_data <-
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 ),
247 {:ok, activity} <- insert(create_data, local, fake),
248 {:fake, false, activity} <- {:fake, fake, activity},
249 _ <- increase_replies_count_if_reply(create_data),
250 _ <- increase_poll_votes_if_vote(create_data),
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:fake, true, activity} ->
258 {:ok, activity}
259 end
260 end
261
262 def accept(%{to: to, actor: actor, object: object} = params) do
263 # only accept false as false value
264 local = !(params[:local] == false)
265
266 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
267 {:ok, activity} <- insert(data, local),
268 :ok <- maybe_federate(activity) do
269 {:ok, activity}
270 end
271 end
272
273 def reject(%{to: to, actor: actor, object: object} = params) do
274 # only accept false as false value
275 local = !(params[:local] == false)
276
277 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
278 {:ok, activity} <- insert(data, local),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 end
282 end
283
284 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
285 # only accept false as false value
286 local = !(params[:local] == false)
287
288 with data <- %{
289 "to" => to,
290 "cc" => cc,
291 "type" => "Update",
292 "actor" => actor,
293 "object" => object
294 },
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
302 def like(
303 %User{ap_id: ap_id} = user,
304 %Object{data: %{"id" => _}} = object,
305 activity_id \\ nil,
306 local \\ true
307 ) do
308 with nil <- get_existing_like(ap_id, object),
309 like_data <- make_like_data(user, object, activity_id),
310 {:ok, activity} <- insert(like_data, local),
311 {:ok, object} <- add_like_to_object(activity, object),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity, object}
314 else
315 %Activity{} = activity -> {:ok, activity, object}
316 error -> {:error, error}
317 end
318 end
319
320 def unlike(
321 %User{} = actor,
322 %Object{} = object,
323 activity_id \\ nil,
324 local \\ true
325 ) do
326 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
327 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
328 {:ok, unlike_activity} <- insert(unlike_data, local),
329 {:ok, _activity} <- Repo.delete(like_activity),
330 {:ok, object} <- remove_like_from_object(like_activity, object),
331 :ok <- maybe_federate(unlike_activity) do
332 {:ok, unlike_activity, like_activity, object}
333 else
334 _e -> {:ok, object}
335 end
336 end
337
338 def announce(
339 %User{ap_id: _} = user,
340 %Object{data: %{"id" => _}} = object,
341 activity_id \\ nil,
342 local \\ true,
343 public \\ true
344 ) do
345 with true <- is_public?(object),
346 announce_data <- make_announce_data(user, object, activity_id, public),
347 {:ok, activity} <- insert(announce_data, local),
348 {:ok, object} <- add_announce_to_object(activity, object),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity, object}
351 else
352 error -> {:error, error}
353 end
354 end
355
356 def unannounce(
357 %User{} = actor,
358 %Object{} = object,
359 activity_id \\ nil,
360 local \\ true
361 ) do
362 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
363 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
364 {:ok, unannounce_activity} <- insert(unannounce_data, local),
365 :ok <- maybe_federate(unannounce_activity),
366 {:ok, _activity} <- Repo.delete(announce_activity),
367 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
368 {:ok, unannounce_activity, object}
369 else
370 _e -> {:ok, object}
371 end
372 end
373
374 def follow(follower, followed, activity_id \\ nil, local \\ true) do
375 with data <- make_follow_data(follower, followed, activity_id),
376 {:ok, activity} <- insert(data, local),
377 :ok <- maybe_federate(activity) do
378 {:ok, activity}
379 end
380 end
381
382 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
383 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
384 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
385 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
386 {:ok, activity} <- insert(unfollow_data, local),
387 :ok <- maybe_federate(activity) do
388 {:ok, activity}
389 end
390 end
391
392 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
393 user = User.get_cached_by_ap_id(actor)
394 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
395
396 with {:ok, object, activity} <- Object.delete(object),
397 data <- %{
398 "type" => "Delete",
399 "actor" => actor,
400 "object" => id,
401 "to" => to,
402 "deleted_activity_id" => activity && activity.id
403 },
404 {:ok, activity} <- insert(data, local),
405 _ <- decrease_replies_count_if_reply(object),
406 # Changing note count prior to enqueuing federation task in order to avoid
407 # race conditions on updating user.info
408 {:ok, _actor} <- decrease_note_count_if_public(user, object),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
415 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
416 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
417
418 if unfollow_blocked do
419 follow_activity = fetch_latest_follow(blocker, blocked)
420 if follow_activity, do: unfollow(blocker, blocked, nil, local)
421 end
422
423 with true <- outgoing_blocks,
424 block_data <- make_block_data(blocker, blocked, activity_id),
425 {:ok, activity} <- insert(block_data, local),
426 :ok <- maybe_federate(activity) do
427 {:ok, activity}
428 else
429 _e -> {:ok, nil}
430 end
431 end
432
433 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
434 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
435 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
436 {:ok, activity} <- insert(unblock_data, local),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def flag(
443 %{
444 actor: actor,
445 context: context,
446 account: account,
447 statuses: statuses,
448 content: content
449 } = params
450 ) do
451 # only accept false as false value
452 local = !(params[:local] == false)
453 forward = !(params[:forward] == false)
454
455 additional = params[:additional] || %{}
456
457 params = %{
458 actor: actor,
459 context: context,
460 account: account,
461 statuses: statuses,
462 content: content
463 }
464
465 additional =
466 if forward do
467 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
468 else
469 Map.merge(additional, %{"to" => [], "cc" => []})
470 end
471
472 with flag_data <- make_flag_data(params, additional),
473 {:ok, activity} <- insert(flag_data, local),
474 :ok <- maybe_federate(activity) do
475 Enum.each(User.all_superusers(), fn superuser ->
476 superuser
477 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
478 |> Pleroma.Emails.Mailer.deliver_async()
479 end)
480
481 {:ok, activity}
482 end
483 end
484
485 defp fetch_activities_for_context_query(context, opts) do
486 public = ["https://www.w3.org/ns/activitystreams#Public"]
487
488 recipients =
489 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
490
491 from(activity in Activity)
492 |> maybe_preload_objects(opts)
493 |> restrict_blocked(opts)
494 |> restrict_recipients(recipients, opts["user"])
495 |> where(
496 [activity],
497 fragment(
498 "?->>'type' = ? and ?->>'context' = ?",
499 activity.data,
500 "Create",
501 activity.data,
502 ^context
503 )
504 )
505 |> exclude_poll_votes(opts)
506 |> order_by([activity], desc: activity.id)
507 end
508
509 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
510 def fetch_activities_for_context(context, opts \\ %{}) do
511 context
512 |> fetch_activities_for_context_query(opts)
513 |> Repo.all()
514 end
515
516 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
517 Pleroma.FlakeId.t() | nil
518 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
519 context
520 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
521 |> limit(1)
522 |> select([a], a.id)
523 |> Repo.one()
524 end
525
526 def fetch_public_activities(opts \\ %{}) do
527 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
528
529 q
530 |> restrict_unlisted()
531 |> Pagination.fetch_paginated(opts)
532 |> Enum.reverse()
533 end
534
535 @valid_visibilities ~w[direct unlisted public private]
536
537 defp restrict_visibility(query, %{visibility: visibility})
538 when is_list(visibility) do
539 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
540 query =
541 from(
542 a in query,
543 where:
544 fragment(
545 "activity_visibility(?, ?, ?) = ANY (?)",
546 a.actor,
547 a.recipients,
548 a.data,
549 ^visibility
550 )
551 )
552
553 query
554 else
555 Logger.error("Could not restrict visibility to #{visibility}")
556 end
557 end
558
559 defp restrict_visibility(query, %{visibility: visibility})
560 when visibility in @valid_visibilities do
561 from(
562 a in query,
563 where:
564 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
565 )
566 end
567
568 defp restrict_visibility(_query, %{visibility: visibility})
569 when visibility not in @valid_visibilities do
570 Logger.error("Could not restrict visibility to #{visibility}")
571 end
572
573 defp restrict_visibility(query, _visibility), do: query
574
575 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
576 do: query
577
578 defp restrict_thread_visibility(
579 query,
580 %{"user" => %User{info: %{skip_thread_containment: true}}},
581 _
582 ),
583 do: query
584
585 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
586 from(
587 a in query,
588 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
589 )
590 end
591
592 defp restrict_thread_visibility(query, _, _), do: query
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put("type", ["Create", "Announce"])
598 |> Map.put("actor_id", user.ap_id)
599 |> Map.put("whole_db", true)
600 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
601
602 recipients =
603 if reading_user do
604 ["https://www.w3.org/ns/activitystreams#Public"] ++
605 [reading_user.ap_id | reading_user.following]
606 else
607 ["https://www.w3.org/ns/activitystreams#Public"]
608 end
609
610 fetch_activities(recipients, params)
611 |> Enum.reverse()
612 end
613
614 defp restrict_since(query, %{"since_id" => ""}), do: query
615
616 defp restrict_since(query, %{"since_id" => since_id}) do
617 from(activity in query, where: activity.id > ^since_id)
618 end
619
620 defp restrict_since(query, _), do: query
621
622 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
623 raise "Can't use the child object without preloading!"
624 end
625
626 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
627 when is_list(tag_reject) and tag_reject != [] do
628 from(
629 [_activity, object] in query,
630 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
631 )
632 end
633
634 defp restrict_tag_reject(query, _), do: query
635
636 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
637 raise "Can't use the child object without preloading!"
638 end
639
640 defp restrict_tag_all(query, %{"tag_all" => tag_all})
641 when is_list(tag_all) and tag_all != [] do
642 from(
643 [_activity, object] in query,
644 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
645 )
646 end
647
648 defp restrict_tag_all(query, _), do: query
649
650 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
651 raise "Can't use the child object without preloading!"
652 end
653
654 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
655 from(
656 [_activity, object] in query,
657 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
658 )
659 end
660
661 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
662 from(
663 [_activity, object] in query,
664 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
665 )
666 end
667
668 defp restrict_tag(query, _), do: query
669
670 defp restrict_recipients(query, [], _user), do: query
671
672 defp restrict_recipients(query, recipients, nil) do
673 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
674 end
675
676 defp restrict_recipients(query, recipients, user) do
677 from(
678 activity in query,
679 where: fragment("? && ?", ^recipients, activity.recipients),
680 or_where: activity.actor == ^user.ap_id
681 )
682 end
683
684 defp restrict_local(query, %{"local_only" => true}) do
685 from(activity in query, where: activity.local == true)
686 end
687
688 defp restrict_local(query, _), do: query
689
690 defp restrict_actor(query, %{"actor_id" => actor_id}) do
691 from(activity in query, where: activity.actor == ^actor_id)
692 end
693
694 defp restrict_actor(query, _), do: query
695
696 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
697 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
698 end
699
700 defp restrict_type(query, %{"type" => type}) do
701 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
702 end
703
704 defp restrict_type(query, _), do: query
705
706 defp restrict_state(query, %{"state" => state}) do
707 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
708 end
709
710 defp restrict_state(query, _), do: query
711
712 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
713 from(
714 activity in query,
715 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
716 )
717 end
718
719 defp restrict_favorited_by(query, _), do: query
720
721 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
722 raise "Can't use the child object without preloading!"
723 end
724
725 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
726 from(
727 [_activity, object] in query,
728 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
729 )
730 end
731
732 defp restrict_media(query, _), do: query
733
734 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
735 from(
736 activity in query,
737 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
738 )
739 end
740
741 defp restrict_replies(query, _), do: query
742
743 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
744 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
745 end
746
747 defp restrict_reblogs(query, _), do: query
748
749 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
750
751 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
752 mutes = info.mutes
753
754 from(
755 activity in query,
756 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
757 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
758 )
759 end
760
761 defp restrict_muted(query, _), do: query
762
763 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
764 blocks = info.blocks || []
765 domain_blocks = info.domain_blocks || []
766
767 query =
768 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
769
770 from(
771 [activity, object: o] in query,
772 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
773 where: fragment("not (? && ?)", activity.recipients, ^blocks),
774 where:
775 fragment(
776 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
777 activity.data,
778 activity.data,
779 ^blocks
780 ),
781 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
782 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
783 )
784 end
785
786 defp restrict_blocked(query, _), do: query
787
788 defp restrict_unlisted(query) do
789 from(
790 activity in query,
791 where:
792 fragment(
793 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
794 activity.data,
795 ^["https://www.w3.org/ns/activitystreams#Public"]
796 )
797 )
798 end
799
800 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
801 from(activity in query, where: activity.id in ^ids)
802 end
803
804 defp restrict_pinned(query, _), do: query
805
806 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
807 muted_reblogs = info.muted_reblogs || []
808
809 from(
810 activity in query,
811 where:
812 fragment(
813 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
814 activity.data,
815 activity.actor,
816 ^muted_reblogs
817 )
818 )
819 end
820
821 defp restrict_muted_reblogs(query, _), do: query
822
823 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
824
825 defp exclude_poll_votes(query, _) do
826 if has_named_binding?(query, :object) do
827 from([activity, object: o] in query,
828 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
829 )
830 else
831 query
832 end
833 end
834
835 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
836
837 defp maybe_preload_objects(query, _) do
838 query
839 |> Activity.with_preloaded_object()
840 end
841
842 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
843
844 defp maybe_preload_bookmarks(query, opts) do
845 query
846 |> Activity.with_preloaded_bookmark(opts["user"])
847 end
848
849 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
850
851 defp maybe_set_thread_muted_field(query, opts) do
852 query
853 |> Activity.with_set_thread_muted_field(opts["user"])
854 end
855
856 defp maybe_order(query, %{order: :desc}) do
857 query
858 |> order_by(desc: :id)
859 end
860
861 defp maybe_order(query, %{order: :asc}) do
862 query
863 |> order_by(asc: :id)
864 end
865
866 defp maybe_order(query, _), do: query
867
868 def fetch_activities_query(recipients, opts \\ %{}) do
869 base_query = from(activity in Activity)
870
871 config = %{
872 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
873 }
874
875 base_query
876 |> maybe_preload_objects(opts)
877 |> maybe_preload_bookmarks(opts)
878 |> maybe_set_thread_muted_field(opts)
879 |> maybe_order(opts)
880 |> restrict_recipients(recipients, opts["user"])
881 |> restrict_tag(opts)
882 |> restrict_tag_reject(opts)
883 |> restrict_tag_all(opts)
884 |> restrict_since(opts)
885 |> restrict_local(opts)
886 |> restrict_actor(opts)
887 |> restrict_type(opts)
888 |> restrict_state(opts)
889 |> restrict_favorited_by(opts)
890 |> restrict_blocked(opts)
891 |> restrict_muted(opts)
892 |> restrict_media(opts)
893 |> restrict_visibility(opts)
894 |> restrict_thread_visibility(opts, config)
895 |> restrict_replies(opts)
896 |> restrict_reblogs(opts)
897 |> restrict_pinned(opts)
898 |> restrict_muted_reblogs(opts)
899 |> Activity.restrict_deactivated_users()
900 |> exclude_poll_votes(opts)
901 end
902
903 def fetch_activities(recipients, opts \\ %{}) do
904 fetch_activities_query(recipients, opts)
905 |> Pagination.fetch_paginated(opts)
906 |> Enum.reverse()
907 end
908
909 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
910 from(activity in query,
911 where:
912 fragment("? && ?", activity.recipients, ^recipients) or
913 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
914 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
915 )
916 end
917
918 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
919 fetch_activities_query([], opts)
920 |> fetch_activities_bounded_query(recipients, recipients_with_public)
921 |> Pagination.fetch_paginated(opts)
922 |> Enum.reverse()
923 end
924
925 def upload(file, opts \\ []) do
926 with {:ok, data} <- Upload.store(file, opts) do
927 obj_data =
928 if opts[:actor] do
929 Map.put(data, "actor", opts[:actor])
930 else
931 data
932 end
933
934 Repo.insert(%Object{data: obj_data})
935 end
936 end
937
938 defp object_to_user_data(data) do
939 avatar =
940 data["icon"]["url"] &&
941 %{
942 "type" => "Image",
943 "url" => [%{"href" => data["icon"]["url"]}]
944 }
945
946 banner =
947 data["image"]["url"] &&
948 %{
949 "type" => "Image",
950 "url" => [%{"href" => data["image"]["url"]}]
951 }
952
953 locked = data["manuallyApprovesFollowers"] || false
954 data = Transmogrifier.maybe_fix_user_object(data)
955
956 user_data = %{
957 ap_id: data["id"],
958 info: %{
959 "ap_enabled" => true,
960 "source_data" => data,
961 "banner" => banner,
962 "locked" => locked
963 },
964 avatar: avatar,
965 name: data["name"],
966 follower_address: data["followers"],
967 bio: data["summary"]
968 }
969
970 # nickname can be nil because of virtual actors
971 user_data =
972 if data["preferredUsername"] do
973 Map.put(
974 user_data,
975 :nickname,
976 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
977 )
978 else
979 Map.put(user_data, :nickname, nil)
980 end
981
982 {:ok, user_data}
983 end
984
985 def user_data_from_user_object(data) do
986 with {:ok, data} <- MRF.filter(data),
987 {:ok, data} <- object_to_user_data(data) do
988 {:ok, data}
989 else
990 e -> {:error, e}
991 end
992 end
993
994 def fetch_and_prepare_user_from_ap_id(ap_id) do
995 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
996 {:ok, data} <- user_data_from_user_object(data) do
997 {:ok, data}
998 else
999 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1000 end
1001 end
1002
1003 def make_user_from_ap_id(ap_id) do
1004 if _user = User.get_cached_by_ap_id(ap_id) do
1005 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1006 else
1007 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1008 User.insert_or_update_user(data)
1009 else
1010 e -> {:error, e}
1011 end
1012 end
1013 end
1014
1015 def make_user_from_nickname(nickname) do
1016 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1017 make_user_from_ap_id(ap_id)
1018 else
1019 _e -> {:error, "No AP id in WebFinger"}
1020 end
1021 end
1022
1023 # filter out broken threads
1024 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1025 entire_thread_visible_for_user?(activity, user)
1026 end
1027
1028 # do post-processing on a specific activity
1029 def contain_activity(%Activity{} = activity, %User{} = user) do
1030 contain_broken_threads(activity, user)
1031 end
1032
1033 def fetch_direct_messages_query do
1034 Activity
1035 |> restrict_type(%{"type" => "Create"})
1036 |> restrict_visibility(%{visibility: "direct"})
1037 |> order_by([activity], asc: activity.id)
1038 end
1039 end