Merge branch 'update-mastofe/glitch-soc-2019-06-26' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 Repo.insert(%Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 })
137
138 # Splice in the child object if we have one.
139 activity =
140 if !is_nil(object) do
141 Map.put(activity, :object, object)
142 else
143 activity
144 end
145
146 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
147
148 Notification.create_notifications(activity)
149
150 participations =
151 activity
152 |> Conversation.create_or_bump_for()
153 |> get_participations()
154
155 stream_out(activity)
156 stream_out_participations(participations)
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 defp get_participations({:ok, %{participations: participations}}), do: participations
180 defp get_participations(_), do: []
181
182 def stream_out_participations(participations) do
183 participations =
184 participations
185 |> Repo.preload(:user)
186
187 Enum.each(participations, fn participation ->
188 Pleroma.Web.Streamer.stream("participation", participation)
189 end)
190 end
191
192 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
193 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
194 conversation = Repo.preload(conversation, :participations),
195 last_activity_id =
196 fetch_latest_activity_id_for_context(conversation.ap_id, %{
197 "user" => user,
198 "blocking_user" => user
199 }) do
200 if last_activity_id do
201 stream_out_participations(conversation.participations)
202 end
203 end
204 end
205
206 def stream_out_participations(_, _), do: :noop
207
208 def stream_out(activity) do
209 public = "https://www.w3.org/ns/activitystreams#Public"
210
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if Enum.member?(activity.data["to"], public) do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 # TODO: Write test, replace with visibility test
241 if !Enum.member?(activity.data["cc"] || [], public) &&
242 !Enum.member?(
243 activity.data["to"],
244 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
245 ),
246 do: Pleroma.Web.Streamer.stream("direct", activity)
247 end
248 end
249 end
250 end
251
252 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257
258 with create_data <-
259 make_create_data(
260 %{to: to, actor: actor, published: published, context: context, object: object},
261 additional
262 ),
263 {:ok, activity} <- insert(create_data, local, fake),
264 {:fake, false, activity} <- {:fake, fake, activity},
265 _ <- increase_replies_count_if_reply(create_data),
266 _ <- increase_poll_votes_if_vote(create_data),
267 # Changing note count prior to enqueuing federation task in order to avoid
268 # race conditions on updating user.info
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 :ok <- maybe_federate(activity) do
271 {:ok, activity}
272 else
273 {:fake, true, activity} ->
274 {:ok, activity}
275 end
276 end
277
278 def accept(%{to: to, actor: actor, object: object} = params) do
279 # only accept false as false value
280 local = !(params[:local] == false)
281
282 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
283 {:ok, activity} <- insert(data, local),
284 :ok <- maybe_federate(activity) do
285 {:ok, activity}
286 end
287 end
288
289 def reject(%{to: to, actor: actor, object: object} = params) do
290 # only accept false as false value
291 local = !(params[:local] == false)
292
293 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
296 {:ok, activity}
297 end
298 end
299
300 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
301 # only accept false as false value
302 local = !(params[:local] == false)
303
304 with data <- %{
305 "to" => to,
306 "cc" => cc,
307 "type" => "Update",
308 "actor" => actor,
309 "object" => object
310 },
311 {:ok, activity} <- insert(data, local),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity}
314 end
315 end
316
317 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
318 def like(
319 %User{ap_id: ap_id} = user,
320 %Object{data: %{"id" => _}} = object,
321 activity_id \\ nil,
322 local \\ true
323 ) do
324 with nil <- get_existing_like(ap_id, object),
325 like_data <- make_like_data(user, object, activity_id),
326 {:ok, activity} <- insert(like_data, local),
327 {:ok, object} <- add_like_to_object(activity, object),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity, object}
330 else
331 %Activity{} = activity -> {:ok, activity, object}
332 error -> {:error, error}
333 end
334 end
335
336 def unlike(
337 %User{} = actor,
338 %Object{} = object,
339 activity_id \\ nil,
340 local \\ true
341 ) do
342 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
343 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
344 {:ok, unlike_activity} <- insert(unlike_data, local),
345 {:ok, _activity} <- Repo.delete(like_activity),
346 {:ok, object} <- remove_like_from_object(like_activity, object),
347 :ok <- maybe_federate(unlike_activity) do
348 {:ok, unlike_activity, like_activity, object}
349 else
350 _e -> {:ok, object}
351 end
352 end
353
354 def announce(
355 %User{ap_id: _} = user,
356 %Object{data: %{"id" => _}} = object,
357 activity_id \\ nil,
358 local \\ true,
359 public \\ true
360 ) do
361 with true <- is_public?(object),
362 announce_data <- make_announce_data(user, object, activity_id, public),
363 {:ok, activity} <- insert(announce_data, local),
364 {:ok, object} <- add_announce_to_object(activity, object),
365 :ok <- maybe_federate(activity) do
366 {:ok, activity, object}
367 else
368 error -> {:error, error}
369 end
370 end
371
372 def unannounce(
373 %User{} = actor,
374 %Object{} = object,
375 activity_id \\ nil,
376 local \\ true
377 ) do
378 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
379 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
380 {:ok, unannounce_activity} <- insert(unannounce_data, local),
381 :ok <- maybe_federate(unannounce_activity),
382 {:ok, _activity} <- Repo.delete(announce_activity),
383 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
384 {:ok, unannounce_activity, object}
385 else
386 _e -> {:ok, object}
387 end
388 end
389
390 def follow(follower, followed, activity_id \\ nil, local \\ true) do
391 with data <- make_follow_data(follower, followed, activity_id),
392 {:ok, activity} <- insert(data, local),
393 :ok <- maybe_federate(activity) do
394 {:ok, activity}
395 end
396 end
397
398 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
399 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
400 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
401 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
402 {:ok, activity} <- insert(unfollow_data, local),
403 :ok <- maybe_federate(activity) do
404 {:ok, activity}
405 end
406 end
407
408 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
409 user = User.get_cached_by_ap_id(actor)
410 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
411
412 with {:ok, object, activity} <- Object.delete(object),
413 data <- %{
414 "type" => "Delete",
415 "actor" => actor,
416 "object" => id,
417 "to" => to,
418 "deleted_activity_id" => activity && activity.id
419 },
420 {:ok, activity} <- insert(data, local, false),
421 stream_out_participations(object, user),
422 _ <- decrease_replies_count_if_reply(object),
423 # Changing note count prior to enqueuing federation task in order to avoid
424 # race conditions on updating user.info
425 {:ok, _actor} <- decrease_note_count_if_public(user, object),
426 :ok <- maybe_federate(activity) do
427 {:ok, activity}
428 end
429 end
430
431 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
432 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
433 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
434
435 if unfollow_blocked do
436 follow_activity = fetch_latest_follow(blocker, blocked)
437 if follow_activity, do: unfollow(blocker, blocked, nil, local)
438 end
439
440 with true <- outgoing_blocks,
441 block_data <- make_block_data(blocker, blocked, activity_id),
442 {:ok, activity} <- insert(block_data, local),
443 :ok <- maybe_federate(activity) do
444 {:ok, activity}
445 else
446 _e -> {:ok, nil}
447 end
448 end
449
450 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
451 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
452 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
453 {:ok, activity} <- insert(unblock_data, local),
454 :ok <- maybe_federate(activity) do
455 {:ok, activity}
456 end
457 end
458
459 def flag(
460 %{
461 actor: actor,
462 context: context,
463 account: account,
464 statuses: statuses,
465 content: content
466 } = params
467 ) do
468 # only accept false as false value
469 local = !(params[:local] == false)
470 forward = !(params[:forward] == false)
471
472 additional = params[:additional] || %{}
473
474 params = %{
475 actor: actor,
476 context: context,
477 account: account,
478 statuses: statuses,
479 content: content
480 }
481
482 additional =
483 if forward do
484 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
485 else
486 Map.merge(additional, %{"to" => [], "cc" => []})
487 end
488
489 with flag_data <- make_flag_data(params, additional),
490 {:ok, activity} <- insert(flag_data, local),
491 :ok <- maybe_federate(activity) do
492 Enum.each(User.all_superusers(), fn superuser ->
493 superuser
494 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
495 |> Pleroma.Emails.Mailer.deliver_async()
496 end)
497
498 {:ok, activity}
499 end
500 end
501
502 defp fetch_activities_for_context_query(context, opts) do
503 public = ["https://www.w3.org/ns/activitystreams#Public"]
504
505 recipients =
506 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
507
508 from(activity in Activity)
509 |> maybe_preload_objects(opts)
510 |> restrict_blocked(opts)
511 |> restrict_recipients(recipients, opts["user"])
512 |> where(
513 [activity],
514 fragment(
515 "?->>'type' = ? and ?->>'context' = ?",
516 activity.data,
517 "Create",
518 activity.data,
519 ^context
520 )
521 )
522 |> exclude_poll_votes(opts)
523 |> order_by([activity], desc: activity.id)
524 end
525
526 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
527 def fetch_activities_for_context(context, opts \\ %{}) do
528 context
529 |> fetch_activities_for_context_query(opts)
530 |> Repo.all()
531 end
532
533 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
534 Pleroma.FlakeId.t() | nil
535 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
536 context
537 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
538 |> limit(1)
539 |> select([a], a.id)
540 |> Repo.one()
541 end
542
543 def fetch_public_activities(opts \\ %{}) do
544 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
545
546 q
547 |> restrict_unlisted()
548 |> Pagination.fetch_paginated(opts)
549 |> Enum.reverse()
550 end
551
552 @valid_visibilities ~w[direct unlisted public private]
553
554 defp restrict_visibility(query, %{visibility: visibility})
555 when is_list(visibility) do
556 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
557 query =
558 from(
559 a in query,
560 where:
561 fragment(
562 "activity_visibility(?, ?, ?) = ANY (?)",
563 a.actor,
564 a.recipients,
565 a.data,
566 ^visibility
567 )
568 )
569
570 query
571 else
572 Logger.error("Could not restrict visibility to #{visibility}")
573 end
574 end
575
576 defp restrict_visibility(query, %{visibility: visibility})
577 when visibility in @valid_visibilities do
578 from(
579 a in query,
580 where:
581 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
582 )
583 end
584
585 defp restrict_visibility(_query, %{visibility: visibility})
586 when visibility not in @valid_visibilities do
587 Logger.error("Could not restrict visibility to #{visibility}")
588 end
589
590 defp restrict_visibility(query, _visibility), do: query
591
592 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
593 do: query
594
595 defp restrict_thread_visibility(
596 query,
597 %{"user" => %User{info: %{skip_thread_containment: true}}},
598 _
599 ),
600 do: query
601
602 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
603 from(
604 a in query,
605 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
606 )
607 end
608
609 defp restrict_thread_visibility(query, _, _), do: query
610
611 def fetch_user_activities(user, reading_user, params \\ %{}) do
612 params =
613 params
614 |> Map.put("type", ["Create", "Announce"])
615 |> Map.put("actor_id", user.ap_id)
616 |> Map.put("whole_db", true)
617 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
618
619 recipients =
620 if reading_user do
621 ["https://www.w3.org/ns/activitystreams#Public"] ++
622 [reading_user.ap_id | reading_user.following]
623 else
624 ["https://www.w3.org/ns/activitystreams#Public"]
625 end
626
627 fetch_activities(recipients, params)
628 |> Enum.reverse()
629 end
630
631 defp restrict_since(query, %{"since_id" => ""}), do: query
632
633 defp restrict_since(query, %{"since_id" => since_id}) do
634 from(activity in query, where: activity.id > ^since_id)
635 end
636
637 defp restrict_since(query, _), do: query
638
639 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
640 raise "Can't use the child object without preloading!"
641 end
642
643 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
644 when is_list(tag_reject) and tag_reject != [] do
645 from(
646 [_activity, object] in query,
647 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
648 )
649 end
650
651 defp restrict_tag_reject(query, _), do: query
652
653 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
654 raise "Can't use the child object without preloading!"
655 end
656
657 defp restrict_tag_all(query, %{"tag_all" => tag_all})
658 when is_list(tag_all) and tag_all != [] do
659 from(
660 [_activity, object] in query,
661 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
662 )
663 end
664
665 defp restrict_tag_all(query, _), do: query
666
667 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
672 from(
673 [_activity, object] in query,
674 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
675 )
676 end
677
678 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
679 from(
680 [_activity, object] in query,
681 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
682 )
683 end
684
685 defp restrict_tag(query, _), do: query
686
687 defp restrict_recipients(query, [], _user), do: query
688
689 defp restrict_recipients(query, recipients, nil) do
690 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
691 end
692
693 defp restrict_recipients(query, recipients, user) do
694 from(
695 activity in query,
696 where: fragment("? && ?", ^recipients, activity.recipients),
697 or_where: activity.actor == ^user.ap_id
698 )
699 end
700
701 defp restrict_local(query, %{"local_only" => true}) do
702 from(activity in query, where: activity.local == true)
703 end
704
705 defp restrict_local(query, _), do: query
706
707 defp restrict_actor(query, %{"actor_id" => actor_id}) do
708 from(activity in query, where: activity.actor == ^actor_id)
709 end
710
711 defp restrict_actor(query, _), do: query
712
713 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
714 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
715 end
716
717 defp restrict_type(query, %{"type" => type}) do
718 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
719 end
720
721 defp restrict_type(query, _), do: query
722
723 defp restrict_state(query, %{"state" => state}) do
724 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
725 end
726
727 defp restrict_state(query, _), do: query
728
729 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
730 from(
731 activity in query,
732 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
733 )
734 end
735
736 defp restrict_favorited_by(query, _), do: query
737
738 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
739 raise "Can't use the child object without preloading!"
740 end
741
742 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
743 from(
744 [_activity, object] in query,
745 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
746 )
747 end
748
749 defp restrict_media(query, _), do: query
750
751 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
752 from(
753 activity in query,
754 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
755 )
756 end
757
758 defp restrict_replies(query, _), do: query
759
760 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
761 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
762 end
763
764 defp restrict_reblogs(query, _), do: query
765
766 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
767
768 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
769 mutes = info.mutes
770
771 from(
772 activity in query,
773 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
774 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
775 )
776 end
777
778 defp restrict_muted(query, _), do: query
779
780 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
781 blocks = info.blocks || []
782 domain_blocks = info.domain_blocks || []
783
784 query =
785 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
786
787 from(
788 [activity, object: o] in query,
789 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
790 where: fragment("not (? && ?)", activity.recipients, ^blocks),
791 where:
792 fragment(
793 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
794 activity.data,
795 activity.data,
796 ^blocks
797 ),
798 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
799 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
800 )
801 end
802
803 defp restrict_blocked(query, _), do: query
804
805 defp restrict_unlisted(query) do
806 from(
807 activity in query,
808 where:
809 fragment(
810 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
811 activity.data,
812 ^["https://www.w3.org/ns/activitystreams#Public"]
813 )
814 )
815 end
816
817 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
818 from(activity in query, where: activity.id in ^ids)
819 end
820
821 defp restrict_pinned(query, _), do: query
822
823 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
824 muted_reblogs = info.muted_reblogs || []
825
826 from(
827 activity in query,
828 where:
829 fragment(
830 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
831 activity.data,
832 activity.actor,
833 ^muted_reblogs
834 )
835 )
836 end
837
838 defp restrict_muted_reblogs(query, _), do: query
839
840 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
841
842 defp exclude_poll_votes(query, _) do
843 if has_named_binding?(query, :object) do
844 from([activity, object: o] in query,
845 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
846 )
847 else
848 query
849 end
850 end
851
852 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
853
854 defp maybe_preload_objects(query, _) do
855 query
856 |> Activity.with_preloaded_object()
857 end
858
859 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
860
861 defp maybe_preload_bookmarks(query, opts) do
862 query
863 |> Activity.with_preloaded_bookmark(opts["user"])
864 end
865
866 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
867
868 defp maybe_set_thread_muted_field(query, opts) do
869 query
870 |> Activity.with_set_thread_muted_field(opts["user"])
871 end
872
873 defp maybe_order(query, %{order: :desc}) do
874 query
875 |> order_by(desc: :id)
876 end
877
878 defp maybe_order(query, %{order: :asc}) do
879 query
880 |> order_by(asc: :id)
881 end
882
883 defp maybe_order(query, _), do: query
884
885 def fetch_activities_query(recipients, opts \\ %{}) do
886 base_query = from(activity in Activity)
887
888 config = %{
889 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
890 }
891
892 base_query
893 |> maybe_preload_objects(opts)
894 |> maybe_preload_bookmarks(opts)
895 |> maybe_set_thread_muted_field(opts)
896 |> maybe_order(opts)
897 |> restrict_recipients(recipients, opts["user"])
898 |> restrict_tag(opts)
899 |> restrict_tag_reject(opts)
900 |> restrict_tag_all(opts)
901 |> restrict_since(opts)
902 |> restrict_local(opts)
903 |> restrict_actor(opts)
904 |> restrict_type(opts)
905 |> restrict_state(opts)
906 |> restrict_favorited_by(opts)
907 |> restrict_blocked(opts)
908 |> restrict_muted(opts)
909 |> restrict_media(opts)
910 |> restrict_visibility(opts)
911 |> restrict_thread_visibility(opts, config)
912 |> restrict_replies(opts)
913 |> restrict_reblogs(opts)
914 |> restrict_pinned(opts)
915 |> restrict_muted_reblogs(opts)
916 |> Activity.restrict_deactivated_users()
917 |> exclude_poll_votes(opts)
918 end
919
920 def fetch_activities(recipients, opts \\ %{}) do
921 fetch_activities_query(recipients, opts)
922 |> Pagination.fetch_paginated(opts)
923 |> Enum.reverse()
924 end
925
926 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
927 from(activity in query,
928 where:
929 fragment("? && ?", activity.recipients, ^recipients) or
930 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
931 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
932 )
933 end
934
935 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
936 fetch_activities_query([], opts)
937 |> fetch_activities_bounded_query(recipients, recipients_with_public)
938 |> Pagination.fetch_paginated(opts)
939 |> Enum.reverse()
940 end
941
942 def upload(file, opts \\ []) do
943 with {:ok, data} <- Upload.store(file, opts) do
944 obj_data =
945 if opts[:actor] do
946 Map.put(data, "actor", opts[:actor])
947 else
948 data
949 end
950
951 Repo.insert(%Object{data: obj_data})
952 end
953 end
954
955 defp object_to_user_data(data) do
956 avatar =
957 data["icon"]["url"] &&
958 %{
959 "type" => "Image",
960 "url" => [%{"href" => data["icon"]["url"]}]
961 }
962
963 banner =
964 data["image"]["url"] &&
965 %{
966 "type" => "Image",
967 "url" => [%{"href" => data["image"]["url"]}]
968 }
969
970 locked = data["manuallyApprovesFollowers"] || false
971 data = Transmogrifier.maybe_fix_user_object(data)
972
973 user_data = %{
974 ap_id: data["id"],
975 info: %{
976 "ap_enabled" => true,
977 "source_data" => data,
978 "banner" => banner,
979 "locked" => locked
980 },
981 avatar: avatar,
982 name: data["name"],
983 follower_address: data["followers"],
984 bio: data["summary"]
985 }
986
987 # nickname can be nil because of virtual actors
988 user_data =
989 if data["preferredUsername"] do
990 Map.put(
991 user_data,
992 :nickname,
993 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
994 )
995 else
996 Map.put(user_data, :nickname, nil)
997 end
998
999 {:ok, user_data}
1000 end
1001
1002 def user_data_from_user_object(data) do
1003 with {:ok, data} <- MRF.filter(data),
1004 {:ok, data} <- object_to_user_data(data) do
1005 {:ok, data}
1006 else
1007 e -> {:error, e}
1008 end
1009 end
1010
1011 def fetch_and_prepare_user_from_ap_id(ap_id) do
1012 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1013 {:ok, data} <- user_data_from_user_object(data) do
1014 {:ok, data}
1015 else
1016 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1017 end
1018 end
1019
1020 def make_user_from_ap_id(ap_id) do
1021 if _user = User.get_cached_by_ap_id(ap_id) do
1022 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1023 else
1024 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1025 User.insert_or_update_user(data)
1026 else
1027 e -> {:error, e}
1028 end
1029 end
1030 end
1031
1032 def make_user_from_nickname(nickname) do
1033 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1034 make_user_from_ap_id(ap_id)
1035 else
1036 _e -> {:error, "No AP id in WebFinger"}
1037 end
1038 end
1039
1040 # filter out broken threads
1041 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1042 entire_thread_visible_for_user?(activity, user)
1043 end
1044
1045 # do post-processing on a specific activity
1046 def contain_activity(%Activity{} = activity, %User{} = user) do
1047 contain_broken_threads(activity, user)
1048 end
1049
1050 def fetch_direct_messages_query do
1051 Activity
1052 |> restrict_type(%{"type" => "Create"})
1053 |> restrict_visibility(%{visibility: "direct"})
1054 |> order_by([activity], asc: activity.id)
1055 end
1056 end