Merge branch 'develop' into issue/941
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 Repo.insert(%Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 })
137
138 # Splice in the child object if we have one.
139 activity =
140 if !is_nil(object) do
141 Map.put(activity, :object, object)
142 else
143 activity
144 end
145
146 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
147
148 Notification.create_notifications(activity)
149
150 participations =
151 activity
152 |> Conversation.create_or_bump_for()
153 |> get_participations()
154
155 stream_out(activity)
156 stream_out_participations(participations)
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 defp get_participations({:ok, %{participations: participations}}), do: participations
180 defp get_participations(_), do: []
181
182 def stream_out_participations(participations) do
183 participations =
184 participations
185 |> Repo.preload(:user)
186
187 Enum.each(participations, fn participation ->
188 Pleroma.Web.Streamer.stream("participation", participation)
189 end)
190 end
191
192 def stream_out(activity) do
193 public = "https://www.w3.org/ns/activitystreams#Public"
194
195 if activity.data["type"] in ["Create", "Announce", "Delete"] do
196 object = Object.normalize(activity)
197 # Do not stream out poll replies
198 unless object.data["type"] == "Answer" do
199 Pleroma.Web.Streamer.stream("user", activity)
200 Pleroma.Web.Streamer.stream("list", activity)
201
202 if Enum.member?(activity.data["to"], public) do
203 Pleroma.Web.Streamer.stream("public", activity)
204
205 if activity.local do
206 Pleroma.Web.Streamer.stream("public:local", activity)
207 end
208
209 if activity.data["type"] in ["Create"] do
210 object.data
211 |> Map.get("tag", [])
212 |> Enum.filter(fn tag -> is_bitstring(tag) end)
213 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
214
215 if object.data["attachment"] != [] do
216 Pleroma.Web.Streamer.stream("public:media", activity)
217
218 if activity.local do
219 Pleroma.Web.Streamer.stream("public:local:media", activity)
220 end
221 end
222 end
223 else
224 # TODO: Write test, replace with visibility test
225 if !Enum.member?(activity.data["cc"] || [], public) &&
226 !Enum.member?(
227 activity.data["to"],
228 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
229 ),
230 do: Pleroma.Web.Streamer.stream("direct", activity)
231 end
232 end
233 end
234 end
235
236 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
237 additional = params[:additional] || %{}
238 # only accept false as false value
239 local = !(params[:local] == false)
240 published = params[:published]
241
242 with create_data <-
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 ),
247 {:ok, activity} <- insert(create_data, local, fake),
248 {:fake, false, activity} <- {:fake, fake, activity},
249 _ <- increase_replies_count_if_reply(create_data),
250 _ <- increase_poll_votes_if_vote(create_data),
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:fake, true, activity} ->
258 {:ok, activity}
259 end
260 end
261
262 def accept(%{to: to, actor: actor, object: object} = params) do
263 # only accept false as false value
264 local = !(params[:local] == false)
265
266 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
267 {:ok, activity} <- insert(data, local),
268 :ok <- maybe_federate(activity) do
269 {:ok, activity}
270 end
271 end
272
273 def reject(%{to: to, actor: actor, object: object} = params) do
274 # only accept false as false value
275 local = !(params[:local] == false)
276
277 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
278 {:ok, activity} <- insert(data, local),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 end
282 end
283
284 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
285 # only accept false as false value
286 local = !(params[:local] == false)
287
288 with data <- %{
289 "to" => to,
290 "cc" => cc,
291 "type" => "Update",
292 "actor" => actor,
293 "object" => object
294 },
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
302 def like(
303 %User{ap_id: ap_id} = user,
304 %Object{data: %{"id" => _}} = object,
305 activity_id \\ nil,
306 local \\ true
307 ) do
308 with nil <- get_existing_like(ap_id, object),
309 like_data <- make_like_data(user, object, activity_id),
310 {:ok, activity} <- insert(like_data, local),
311 {:ok, object} <- add_like_to_object(activity, object),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity, object}
314 else
315 %Activity{} = activity -> {:ok, activity, object}
316 error -> {:error, error}
317 end
318 end
319
320 def unlike(
321 %User{} = actor,
322 %Object{} = object,
323 activity_id \\ nil,
324 local \\ true
325 ) do
326 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
327 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
328 {:ok, unlike_activity} <- insert(unlike_data, local),
329 {:ok, _activity} <- Repo.delete(like_activity),
330 {:ok, object} <- remove_like_from_object(like_activity, object),
331 :ok <- maybe_federate(unlike_activity) do
332 {:ok, unlike_activity, like_activity, object}
333 else
334 _e -> {:ok, object}
335 end
336 end
337
338 def announce(
339 %User{ap_id: _} = user,
340 %Object{data: %{"id" => _}} = object,
341 activity_id \\ nil,
342 local \\ true,
343 public \\ true
344 ) do
345 with true <- is_public?(object),
346 announce_data <- make_announce_data(user, object, activity_id, public),
347 {:ok, activity} <- insert(announce_data, local),
348 {:ok, object} <- add_announce_to_object(activity, object),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity, object}
351 else
352 error -> {:error, error}
353 end
354 end
355
356 def unannounce(
357 %User{} = actor,
358 %Object{} = object,
359 activity_id \\ nil,
360 local \\ true
361 ) do
362 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
363 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
364 {:ok, unannounce_activity} <- insert(unannounce_data, local),
365 :ok <- maybe_federate(unannounce_activity),
366 {:ok, _activity} <- Repo.delete(announce_activity),
367 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
368 {:ok, unannounce_activity, object}
369 else
370 _e -> {:ok, object}
371 end
372 end
373
374 def follow(follower, followed, activity_id \\ nil, local \\ true) do
375 with data <- make_follow_data(follower, followed, activity_id),
376 {:ok, activity} <- insert(data, local),
377 :ok <- maybe_federate(activity) do
378 {:ok, activity}
379 end
380 end
381
382 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
383 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
384 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
385 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
386 {:ok, activity} <- insert(unfollow_data, local),
387 :ok <- maybe_federate(activity) do
388 {:ok, activity}
389 end
390 end
391
392 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
393 user = User.get_cached_by_ap_id(actor)
394 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
395
396 with {:ok, object, activity} <- Object.delete(object),
397 data <- %{
398 "type" => "Delete",
399 "actor" => actor,
400 "object" => id,
401 "to" => to,
402 "deleted_activity_id" => activity && activity.id
403 },
404 {:ok, activity} <- insert(data, local),
405 _ <- decrease_replies_count_if_reply(object),
406 # Changing note count prior to enqueuing federation task in order to avoid
407 # race conditions on updating user.info
408 {:ok, _actor} <- decrease_note_count_if_public(user, object),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
415 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
416 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
417
418 if unfollow_blocked do
419 follow_activity = fetch_latest_follow(blocker, blocked)
420 if follow_activity, do: unfollow(blocker, blocked, nil, local)
421 end
422
423 with true <- outgoing_blocks,
424 block_data <- make_block_data(blocker, blocked, activity_id),
425 {:ok, activity} <- insert(block_data, local),
426 :ok <- maybe_federate(activity) do
427 {:ok, activity}
428 else
429 _e -> {:ok, nil}
430 end
431 end
432
433 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
434 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
435 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
436 {:ok, activity} <- insert(unblock_data, local),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def flag(
443 %{
444 actor: actor,
445 context: context,
446 account: account,
447 statuses: statuses,
448 content: content
449 } = params
450 ) do
451 # only accept false as false value
452 local = !(params[:local] == false)
453 forward = !(params[:forward] == false)
454
455 additional = params[:additional] || %{}
456
457 params = %{
458 actor: actor,
459 context: context,
460 account: account,
461 statuses: statuses,
462 content: content
463 }
464
465 additional =
466 if forward do
467 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
468 else
469 Map.merge(additional, %{"to" => [], "cc" => []})
470 end
471
472 with flag_data <- make_flag_data(params, additional),
473 {:ok, activity} <- insert(flag_data, local),
474 :ok <- maybe_federate(activity) do
475 Enum.each(User.all_superusers(), fn superuser ->
476 superuser
477 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
478 |> Pleroma.Emails.Mailer.deliver_async()
479 end)
480
481 {:ok, activity}
482 end
483 end
484
485 defp fetch_activities_for_context_query(context, opts) do
486 public = ["https://www.w3.org/ns/activitystreams#Public"]
487
488 recipients =
489 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
490
491 from(activity in Activity)
492 |> maybe_preload_objects(opts)
493 |> restrict_blocked(opts)
494 |> restrict_recipients(recipients, opts["user"])
495 |> where(
496 [activity],
497 fragment(
498 "?->>'type' = ? and ?->>'context' = ?",
499 activity.data,
500 "Create",
501 activity.data,
502 ^context
503 )
504 )
505 |> exclude_poll_votes(opts)
506 |> order_by([activity], desc: activity.id)
507 end
508
509 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
510 def fetch_activities_for_context(context, opts \\ %{}) do
511 context
512 |> fetch_activities_for_context_query(opts)
513 |> Repo.all()
514 end
515
516 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
517 Pleroma.FlakeId.t() | nil
518 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
519 context
520 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
521 |> limit(1)
522 |> select([a], a.id)
523 |> Repo.one()
524 end
525
526 def fetch_public_activities(opts \\ %{}) do
527 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
528
529 q
530 |> restrict_unlisted()
531 |> Pagination.fetch_paginated(opts)
532 |> Enum.reverse()
533 end
534
535 @valid_visibilities ~w[direct unlisted public private]
536
537 defp restrict_visibility(query, %{visibility: visibility})
538 when is_list(visibility) do
539 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
540 query =
541 from(
542 a in query,
543 where:
544 fragment(
545 "activity_visibility(?, ?, ?) = ANY (?)",
546 a.actor,
547 a.recipients,
548 a.data,
549 ^visibility
550 )
551 )
552
553 query
554 else
555 Logger.error("Could not restrict visibility to #{visibility}")
556 end
557 end
558
559 defp restrict_visibility(query, %{visibility: visibility})
560 when visibility in @valid_visibilities do
561 query =
562 from(
563 a in query,
564 where:
565 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
566 )
567
568 query
569 end
570
571 defp restrict_visibility(_query, %{visibility: visibility})
572 when visibility not in @valid_visibilities do
573 Logger.error("Could not restrict visibility to #{visibility}")
574 end
575
576 defp restrict_visibility(query, _visibility), do: query
577
578 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do
579 query =
580 from(
581 a in query,
582 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
583 )
584
585 query
586 end
587
588 defp restrict_thread_visibility(query, _), do: query
589
590 def fetch_user_activities(user, reading_user, params \\ %{}) do
591 params =
592 params
593 |> Map.put("type", ["Create", "Announce"])
594 |> Map.put("actor_id", user.ap_id)
595 |> Map.put("whole_db", true)
596 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
597
598 recipients =
599 if reading_user do
600 ["https://www.w3.org/ns/activitystreams#Public"] ++
601 [reading_user.ap_id | reading_user.following]
602 else
603 ["https://www.w3.org/ns/activitystreams#Public"]
604 end
605
606 fetch_activities(recipients, params)
607 |> Enum.reverse()
608 end
609
610 defp restrict_since(query, %{"since_id" => ""}), do: query
611
612 defp restrict_since(query, %{"since_id" => since_id}) do
613 from(activity in query, where: activity.id > ^since_id)
614 end
615
616 defp restrict_since(query, _), do: query
617
618 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
619 raise "Can't use the child object without preloading!"
620 end
621
622 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
623 when is_list(tag_reject) and tag_reject != [] do
624 from(
625 [_activity, object] in query,
626 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
627 )
628 end
629
630 defp restrict_tag_reject(query, _), do: query
631
632 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
633 raise "Can't use the child object without preloading!"
634 end
635
636 defp restrict_tag_all(query, %{"tag_all" => tag_all})
637 when is_list(tag_all) and tag_all != [] do
638 from(
639 [_activity, object] in query,
640 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
641 )
642 end
643
644 defp restrict_tag_all(query, _), do: query
645
646 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
647 raise "Can't use the child object without preloading!"
648 end
649
650 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
651 from(
652 [_activity, object] in query,
653 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
654 )
655 end
656
657 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
658 from(
659 [_activity, object] in query,
660 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
661 )
662 end
663
664 defp restrict_tag(query, _), do: query
665
666 defp restrict_recipients(query, [], _user), do: query
667
668 defp restrict_recipients(query, recipients, nil) do
669 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
670 end
671
672 defp restrict_recipients(query, recipients, user) do
673 from(
674 activity in query,
675 where: fragment("? && ?", ^recipients, activity.recipients),
676 or_where: activity.actor == ^user.ap_id
677 )
678 end
679
680 defp restrict_local(query, %{"local_only" => true}) do
681 from(activity in query, where: activity.local == true)
682 end
683
684 defp restrict_local(query, _), do: query
685
686 defp restrict_actor(query, %{"actor_id" => actor_id}) do
687 from(activity in query, where: activity.actor == ^actor_id)
688 end
689
690 defp restrict_actor(query, _), do: query
691
692 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
693 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
694 end
695
696 defp restrict_type(query, %{"type" => type}) do
697 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
698 end
699
700 defp restrict_type(query, _), do: query
701
702 defp restrict_state(query, %{"state" => state}) do
703 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
704 end
705
706 defp restrict_state(query, _), do: query
707
708 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
709 from(
710 activity in query,
711 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
712 )
713 end
714
715 defp restrict_favorited_by(query, _), do: query
716
717 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
718 raise "Can't use the child object without preloading!"
719 end
720
721 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
722 from(
723 [_activity, object] in query,
724 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
725 )
726 end
727
728 defp restrict_media(query, _), do: query
729
730 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
731 from(
732 activity in query,
733 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
734 )
735 end
736
737 defp restrict_replies(query, _), do: query
738
739 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
740 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
741 end
742
743 defp restrict_reblogs(query, _), do: query
744
745 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
746
747 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
748 mutes = info.mutes
749
750 from(
751 activity in query,
752 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
753 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
754 )
755 end
756
757 defp restrict_muted(query, _), do: query
758
759 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
760 blocks = info.blocks || []
761 domain_blocks = info.domain_blocks || []
762
763 query =
764 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
765
766 from(
767 [activity, object: o] in query,
768 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
769 where: fragment("not (? && ?)", activity.recipients, ^blocks),
770 where:
771 fragment(
772 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
773 activity.data,
774 activity.data,
775 ^blocks
776 ),
777 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
778 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
779 )
780 end
781
782 defp restrict_blocked(query, _), do: query
783
784 defp restrict_unlisted(query) do
785 from(
786 activity in query,
787 where:
788 fragment(
789 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
790 activity.data,
791 ^["https://www.w3.org/ns/activitystreams#Public"]
792 )
793 )
794 end
795
796 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
797 from(activity in query, where: activity.id in ^ids)
798 end
799
800 defp restrict_pinned(query, _), do: query
801
802 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
803 muted_reblogs = info.muted_reblogs || []
804
805 from(
806 activity in query,
807 where:
808 fragment(
809 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
810 activity.data,
811 activity.actor,
812 ^muted_reblogs
813 )
814 )
815 end
816
817 defp restrict_muted_reblogs(query, _), do: query
818
819 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
820
821 defp exclude_poll_votes(query, _) do
822 if has_named_binding?(query, :object) do
823 from([activity, object: o] in query,
824 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
825 )
826 else
827 query
828 end
829 end
830
831 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
832
833 defp maybe_preload_objects(query, _) do
834 query
835 |> Activity.with_preloaded_object()
836 end
837
838 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
839
840 defp maybe_preload_bookmarks(query, opts) do
841 query
842 |> Activity.with_preloaded_bookmark(opts["user"])
843 end
844
845 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
846
847 defp maybe_set_thread_muted_field(query, opts) do
848 query
849 |> Activity.with_set_thread_muted_field(opts["user"])
850 end
851
852 defp maybe_order(query, %{order: :desc}) do
853 query
854 |> order_by(desc: :id)
855 end
856
857 defp maybe_order(query, %{order: :asc}) do
858 query
859 |> order_by(asc: :id)
860 end
861
862 defp maybe_order(query, _), do: query
863
864 def fetch_activities_query(recipients, opts \\ %{}) do
865 base_query = from(activity in Activity)
866
867 base_query
868 |> maybe_preload_objects(opts)
869 |> maybe_preload_bookmarks(opts)
870 |> maybe_set_thread_muted_field(opts)
871 |> maybe_order(opts)
872 |> restrict_recipients(recipients, opts["user"])
873 |> restrict_tag(opts)
874 |> restrict_tag_reject(opts)
875 |> restrict_tag_all(opts)
876 |> restrict_since(opts)
877 |> restrict_local(opts)
878 |> restrict_actor(opts)
879 |> restrict_type(opts)
880 |> restrict_state(opts)
881 |> restrict_favorited_by(opts)
882 |> restrict_blocked(opts)
883 |> restrict_muted(opts)
884 |> restrict_media(opts)
885 |> restrict_visibility(opts)
886 |> restrict_thread_visibility(opts)
887 |> restrict_replies(opts)
888 |> restrict_reblogs(opts)
889 |> restrict_pinned(opts)
890 |> restrict_muted_reblogs(opts)
891 |> Activity.restrict_deactivated_users()
892 |> exclude_poll_votes(opts)
893 end
894
895 def fetch_activities(recipients, opts \\ %{}) do
896 fetch_activities_query(recipients, opts)
897 |> Pagination.fetch_paginated(opts)
898 |> Enum.reverse()
899 end
900
901 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
902 from(activity in query,
903 where:
904 fragment("? && ?", activity.recipients, ^recipients) or
905 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
906 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
907 )
908 end
909
910 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
911 fetch_activities_query([], opts)
912 |> fetch_activities_bounded_query(recipients, recipients_with_public)
913 |> Pagination.fetch_paginated(opts)
914 |> Enum.reverse()
915 end
916
917 def upload(file, opts \\ []) do
918 with {:ok, data} <- Upload.store(file, opts) do
919 obj_data =
920 if opts[:actor] do
921 Map.put(data, "actor", opts[:actor])
922 else
923 data
924 end
925
926 Repo.insert(%Object{data: obj_data})
927 end
928 end
929
930 defp object_to_user_data(data) do
931 avatar =
932 data["icon"]["url"] &&
933 %{
934 "type" => "Image",
935 "url" => [%{"href" => data["icon"]["url"]}]
936 }
937
938 banner =
939 data["image"]["url"] &&
940 %{
941 "type" => "Image",
942 "url" => [%{"href" => data["image"]["url"]}]
943 }
944
945 locked = data["manuallyApprovesFollowers"] || false
946 data = Transmogrifier.maybe_fix_user_object(data)
947
948 user_data = %{
949 ap_id: data["id"],
950 info: %{
951 "ap_enabled" => true,
952 "source_data" => data,
953 "banner" => banner,
954 "locked" => locked
955 },
956 avatar: avatar,
957 name: data["name"],
958 follower_address: data["followers"],
959 bio: data["summary"]
960 }
961
962 # nickname can be nil because of virtual actors
963 user_data =
964 if data["preferredUsername"] do
965 Map.put(
966 user_data,
967 :nickname,
968 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
969 )
970 else
971 Map.put(user_data, :nickname, nil)
972 end
973
974 {:ok, user_data}
975 end
976
977 def user_data_from_user_object(data) do
978 with {:ok, data} <- MRF.filter(data),
979 {:ok, data} <- object_to_user_data(data) do
980 {:ok, data}
981 else
982 e -> {:error, e}
983 end
984 end
985
986 def fetch_and_prepare_user_from_ap_id(ap_id) do
987 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
988 {:ok, data} <- user_data_from_user_object(data) do
989 {:ok, data}
990 else
991 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
992 end
993 end
994
995 def make_user_from_ap_id(ap_id) do
996 if _user = User.get_cached_by_ap_id(ap_id) do
997 Transmogrifier.upgrade_user_from_ap_id(ap_id)
998 else
999 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1000 User.insert_or_update_user(data)
1001 else
1002 e -> {:error, e}
1003 end
1004 end
1005 end
1006
1007 def make_user_from_nickname(nickname) do
1008 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1009 make_user_from_ap_id(ap_id)
1010 else
1011 _e -> {:error, "No AP id in WebFinger"}
1012 end
1013 end
1014
1015 # filter out broken threads
1016 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1017 entire_thread_visible_for_user?(activity, user)
1018 end
1019
1020 # do post-processing on a specific activity
1021 def contain_activity(%Activity{} = activity, %User{} = user) do
1022 contain_broken_threads(activity, user)
1023 end
1024
1025 def fetch_direct_messages_query do
1026 Activity
1027 |> restrict_type(%{"type" => "Create"})
1028 |> restrict_visibility(%{visibility: "direct"})
1029 |> order_by([activity], asc: activity.id)
1030 end
1031 end