Refactor Follows/Followers counter syncronization
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 Repo.insert(%Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 })
137
138 # Splice in the child object if we have one.
139 activity =
140 if !is_nil(object) do
141 Map.put(activity, :object, object)
142 else
143 activity
144 end
145
146 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
147
148 Notification.create_notifications(activity)
149
150 participations =
151 activity
152 |> Conversation.create_or_bump_for()
153 |> get_participations()
154
155 stream_out(activity)
156 stream_out_participations(participations)
157 {:ok, activity}
158 else
159 %Activity{} = activity ->
160 {:ok, activity}
161
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients,
168 id: "pleroma:fakeid"
169 }
170
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:ok, activity}
173
174 error ->
175 {:error, error}
176 end
177 end
178
179 defp get_participations({:ok, %{participations: participations}}), do: participations
180 defp get_participations(_), do: []
181
182 def stream_out_participations(participations) do
183 participations =
184 participations
185 |> Repo.preload(:user)
186
187 Enum.each(participations, fn participation ->
188 Pleroma.Web.Streamer.stream("participation", participation)
189 end)
190 end
191
192 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
193 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
194 conversation = Repo.preload(conversation, :participations),
195 last_activity_id =
196 fetch_latest_activity_id_for_context(conversation.ap_id, %{
197 "user" => user,
198 "blocking_user" => user
199 }) do
200 if last_activity_id do
201 stream_out_participations(conversation.participations)
202 end
203 end
204 end
205
206 def stream_out_participations(_, _), do: :noop
207
208 def stream_out(activity) do
209 public = "https://www.w3.org/ns/activitystreams#Public"
210
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if Enum.member?(activity.data["to"], public) do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 # TODO: Write test, replace with visibility test
241 if !Enum.member?(activity.data["cc"] || [], public) &&
242 !Enum.member?(
243 activity.data["to"],
244 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
245 ),
246 do: Pleroma.Web.Streamer.stream("direct", activity)
247 end
248 end
249 end
250 end
251
252 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257
258 with create_data <-
259 make_create_data(
260 %{to: to, actor: actor, published: published, context: context, object: object},
261 additional
262 ),
263 {:ok, activity} <- insert(create_data, local, fake),
264 {:fake, false, activity} <- {:fake, fake, activity},
265 _ <- increase_replies_count_if_reply(create_data),
266 _ <- increase_poll_votes_if_vote(create_data),
267 # Changing note count prior to enqueuing federation task in order to avoid
268 # race conditions on updating user.info
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 :ok <- maybe_federate(activity) do
271 {:ok, activity}
272 else
273 {:fake, true, activity} ->
274 {:ok, activity}
275 end
276 end
277
278 def accept(%{to: to, actor: actor, object: object} = params) do
279 # only accept false as false value
280 local = !(params[:local] == false)
281
282 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
283 {:ok, activity} <- insert(data, local),
284 :ok <- maybe_federate(activity) do
285 {:ok, activity}
286 end
287 end
288
289 def reject(%{to: to, actor: actor, object: object} = params) do
290 # only accept false as false value
291 local = !(params[:local] == false)
292
293 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
296 {:ok, activity}
297 end
298 end
299
300 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
301 # only accept false as false value
302 local = !(params[:local] == false)
303
304 with data <- %{
305 "to" => to,
306 "cc" => cc,
307 "type" => "Update",
308 "actor" => actor,
309 "object" => object
310 },
311 {:ok, activity} <- insert(data, local),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity}
314 end
315 end
316
317 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
318 def like(
319 %User{ap_id: ap_id} = user,
320 %Object{data: %{"id" => _}} = object,
321 activity_id \\ nil,
322 local \\ true
323 ) do
324 with nil <- get_existing_like(ap_id, object),
325 like_data <- make_like_data(user, object, activity_id),
326 {:ok, activity} <- insert(like_data, local),
327 {:ok, object} <- add_like_to_object(activity, object),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity, object}
330 else
331 %Activity{} = activity -> {:ok, activity, object}
332 error -> {:error, error}
333 end
334 end
335
336 def unlike(
337 %User{} = actor,
338 %Object{} = object,
339 activity_id \\ nil,
340 local \\ true
341 ) do
342 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
343 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
344 {:ok, unlike_activity} <- insert(unlike_data, local),
345 {:ok, _activity} <- Repo.delete(like_activity),
346 {:ok, object} <- remove_like_from_object(like_activity, object),
347 :ok <- maybe_federate(unlike_activity) do
348 {:ok, unlike_activity, like_activity, object}
349 else
350 _e -> {:ok, object}
351 end
352 end
353
354 def announce(
355 %User{ap_id: _} = user,
356 %Object{data: %{"id" => _}} = object,
357 activity_id \\ nil,
358 local \\ true,
359 public \\ true
360 ) do
361 with true <- is_public?(object),
362 announce_data <- make_announce_data(user, object, activity_id, public),
363 {:ok, activity} <- insert(announce_data, local),
364 {:ok, object} <- add_announce_to_object(activity, object),
365 :ok <- maybe_federate(activity) do
366 {:ok, activity, object}
367 else
368 error -> {:error, error}
369 end
370 end
371
372 def unannounce(
373 %User{} = actor,
374 %Object{} = object,
375 activity_id \\ nil,
376 local \\ true
377 ) do
378 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
379 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
380 {:ok, unannounce_activity} <- insert(unannounce_data, local),
381 :ok <- maybe_federate(unannounce_activity),
382 {:ok, _activity} <- Repo.delete(announce_activity),
383 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
384 {:ok, unannounce_activity, object}
385 else
386 _e -> {:ok, object}
387 end
388 end
389
390 def follow(follower, followed, activity_id \\ nil, local \\ true) do
391 with data <- make_follow_data(follower, followed, activity_id),
392 {:ok, activity} <- insert(data, local),
393 :ok <- maybe_federate(activity) do
394 {:ok, activity}
395 end
396 end
397
398 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
399 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
400 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
401 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
402 {:ok, activity} <- insert(unfollow_data, local),
403 :ok <- maybe_federate(activity) do
404 {:ok, activity}
405 end
406 end
407
408 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
409 with data <- %{
410 "to" => [follower_address],
411 "type" => "Delete",
412 "actor" => ap_id,
413 "object" => %{"type" => "Person", "id" => ap_id}
414 },
415 {:ok, activity} <- insert(data, true, true),
416 :ok <- maybe_federate(activity) do
417 {:ok, user}
418 end
419 end
420
421 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
422 user = User.get_cached_by_ap_id(actor)
423 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
424
425 with {:ok, object, activity} <- Object.delete(object),
426 data <- %{
427 "type" => "Delete",
428 "actor" => actor,
429 "object" => id,
430 "to" => to,
431 "deleted_activity_id" => activity && activity.id
432 },
433 {:ok, activity} <- insert(data, local, false),
434 stream_out_participations(object, user),
435 _ <- decrease_replies_count_if_reply(object),
436 # Changing note count prior to enqueuing federation task in order to avoid
437 # race conditions on updating user.info
438 {:ok, _actor} <- decrease_note_count_if_public(user, object),
439 :ok <- maybe_federate(activity) do
440 {:ok, activity}
441 end
442 end
443
444 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
445 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
446 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
447
448 if unfollow_blocked do
449 follow_activity = fetch_latest_follow(blocker, blocked)
450 if follow_activity, do: unfollow(blocker, blocked, nil, local)
451 end
452
453 with true <- outgoing_blocks,
454 block_data <- make_block_data(blocker, blocked, activity_id),
455 {:ok, activity} <- insert(block_data, local),
456 :ok <- maybe_federate(activity) do
457 {:ok, activity}
458 else
459 _e -> {:ok, nil}
460 end
461 end
462
463 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
464 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
465 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
466 {:ok, activity} <- insert(unblock_data, local),
467 :ok <- maybe_federate(activity) do
468 {:ok, activity}
469 end
470 end
471
472 def flag(
473 %{
474 actor: actor,
475 context: context,
476 account: account,
477 statuses: statuses,
478 content: content
479 } = params
480 ) do
481 # only accept false as false value
482 local = !(params[:local] == false)
483 forward = !(params[:forward] == false)
484
485 additional = params[:additional] || %{}
486
487 params = %{
488 actor: actor,
489 context: context,
490 account: account,
491 statuses: statuses,
492 content: content
493 }
494
495 additional =
496 if forward do
497 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
498 else
499 Map.merge(additional, %{"to" => [], "cc" => []})
500 end
501
502 with flag_data <- make_flag_data(params, additional),
503 {:ok, activity} <- insert(flag_data, local),
504 :ok <- maybe_federate(activity) do
505 Enum.each(User.all_superusers(), fn superuser ->
506 superuser
507 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
508 |> Pleroma.Emails.Mailer.deliver_async()
509 end)
510
511 {:ok, activity}
512 end
513 end
514
515 defp fetch_activities_for_context_query(context, opts) do
516 public = ["https://www.w3.org/ns/activitystreams#Public"]
517
518 recipients =
519 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
520
521 from(activity in Activity)
522 |> maybe_preload_objects(opts)
523 |> restrict_blocked(opts)
524 |> restrict_recipients(recipients, opts["user"])
525 |> where(
526 [activity],
527 fragment(
528 "?->>'type' = ? and ?->>'context' = ?",
529 activity.data,
530 "Create",
531 activity.data,
532 ^context
533 )
534 )
535 |> exclude_poll_votes(opts)
536 |> order_by([activity], desc: activity.id)
537 end
538
539 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
540 def fetch_activities_for_context(context, opts \\ %{}) do
541 context
542 |> fetch_activities_for_context_query(opts)
543 |> Repo.all()
544 end
545
546 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
547 Pleroma.FlakeId.t() | nil
548 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
549 context
550 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
551 |> limit(1)
552 |> select([a], a.id)
553 |> Repo.one()
554 end
555
556 def fetch_public_activities(opts \\ %{}) do
557 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
558
559 q
560 |> restrict_unlisted()
561 |> Pagination.fetch_paginated(opts)
562 |> Enum.reverse()
563 end
564
565 @valid_visibilities ~w[direct unlisted public private]
566
567 defp restrict_visibility(query, %{visibility: visibility})
568 when is_list(visibility) do
569 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
570 query =
571 from(
572 a in query,
573 where:
574 fragment(
575 "activity_visibility(?, ?, ?) = ANY (?)",
576 a.actor,
577 a.recipients,
578 a.data,
579 ^visibility
580 )
581 )
582
583 query
584 else
585 Logger.error("Could not restrict visibility to #{visibility}")
586 end
587 end
588
589 defp restrict_visibility(query, %{visibility: visibility})
590 when visibility in @valid_visibilities do
591 from(
592 a in query,
593 where:
594 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
595 )
596 end
597
598 defp restrict_visibility(_query, %{visibility: visibility})
599 when visibility not in @valid_visibilities do
600 Logger.error("Could not restrict visibility to #{visibility}")
601 end
602
603 defp restrict_visibility(query, _visibility), do: query
604
605 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
606 do: query
607
608 defp restrict_thread_visibility(
609 query,
610 %{"user" => %User{info: %{skip_thread_containment: true}}},
611 _
612 ),
613 do: query
614
615 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
616 from(
617 a in query,
618 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
619 )
620 end
621
622 defp restrict_thread_visibility(query, _, _), do: query
623
624 def fetch_user_activities(user, reading_user, params \\ %{}) do
625 params =
626 params
627 |> Map.put("type", ["Create", "Announce"])
628 |> Map.put("actor_id", user.ap_id)
629 |> Map.put("whole_db", true)
630 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
631
632 recipients =
633 if reading_user do
634 ["https://www.w3.org/ns/activitystreams#Public"] ++
635 [reading_user.ap_id | reading_user.following]
636 else
637 ["https://www.w3.org/ns/activitystreams#Public"]
638 end
639
640 fetch_activities(recipients, params)
641 |> Enum.reverse()
642 end
643
644 defp restrict_since(query, %{"since_id" => ""}), do: query
645
646 defp restrict_since(query, %{"since_id" => since_id}) do
647 from(activity in query, where: activity.id > ^since_id)
648 end
649
650 defp restrict_since(query, _), do: query
651
652 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
653 raise "Can't use the child object without preloading!"
654 end
655
656 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
657 when is_list(tag_reject) and tag_reject != [] do
658 from(
659 [_activity, object] in query,
660 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
661 )
662 end
663
664 defp restrict_tag_reject(query, _), do: query
665
666 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
667 raise "Can't use the child object without preloading!"
668 end
669
670 defp restrict_tag_all(query, %{"tag_all" => tag_all})
671 when is_list(tag_all) and tag_all != [] do
672 from(
673 [_activity, object] in query,
674 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
675 )
676 end
677
678 defp restrict_tag_all(query, _), do: query
679
680 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
681 raise "Can't use the child object without preloading!"
682 end
683
684 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
685 from(
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
688 )
689 end
690
691 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
692 from(
693 [_activity, object] in query,
694 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
695 )
696 end
697
698 defp restrict_tag(query, _), do: query
699
700 defp restrict_recipients(query, [], _user), do: query
701
702 defp restrict_recipients(query, recipients, nil) do
703 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
704 end
705
706 defp restrict_recipients(query, recipients, user) do
707 from(
708 activity in query,
709 where: fragment("? && ?", ^recipients, activity.recipients),
710 or_where: activity.actor == ^user.ap_id
711 )
712 end
713
714 defp restrict_local(query, %{"local_only" => true}) do
715 from(activity in query, where: activity.local == true)
716 end
717
718 defp restrict_local(query, _), do: query
719
720 defp restrict_actor(query, %{"actor_id" => actor_id}) do
721 from(activity in query, where: activity.actor == ^actor_id)
722 end
723
724 defp restrict_actor(query, _), do: query
725
726 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
727 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
728 end
729
730 defp restrict_type(query, %{"type" => type}) do
731 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
732 end
733
734 defp restrict_type(query, _), do: query
735
736 defp restrict_state(query, %{"state" => state}) do
737 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
738 end
739
740 defp restrict_state(query, _), do: query
741
742 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
743 from(
744 activity in query,
745 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
746 )
747 end
748
749 defp restrict_favorited_by(query, _), do: query
750
751 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
752 raise "Can't use the child object without preloading!"
753 end
754
755 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
756 from(
757 [_activity, object] in query,
758 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
759 )
760 end
761
762 defp restrict_media(query, _), do: query
763
764 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
765 from(
766 activity in query,
767 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
768 )
769 end
770
771 defp restrict_replies(query, _), do: query
772
773 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
774 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
775 end
776
777 defp restrict_reblogs(query, _), do: query
778
779 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
780
781 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
782 mutes = info.mutes
783
784 from(
785 activity in query,
786 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
787 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
788 )
789 end
790
791 defp restrict_muted(query, _), do: query
792
793 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
794 blocks = info.blocks || []
795 domain_blocks = info.domain_blocks || []
796
797 query =
798 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
799
800 from(
801 [activity, object: o] in query,
802 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
803 where: fragment("not (? && ?)", activity.recipients, ^blocks),
804 where:
805 fragment(
806 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
807 activity.data,
808 activity.data,
809 ^blocks
810 ),
811 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
812 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
813 )
814 end
815
816 defp restrict_blocked(query, _), do: query
817
818 defp restrict_unlisted(query) do
819 from(
820 activity in query,
821 where:
822 fragment(
823 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
824 activity.data,
825 ^["https://www.w3.org/ns/activitystreams#Public"]
826 )
827 )
828 end
829
830 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
831 from(activity in query, where: activity.id in ^ids)
832 end
833
834 defp restrict_pinned(query, _), do: query
835
836 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
837 muted_reblogs = info.muted_reblogs || []
838
839 from(
840 activity in query,
841 where:
842 fragment(
843 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
844 activity.data,
845 activity.actor,
846 ^muted_reblogs
847 )
848 )
849 end
850
851 defp restrict_muted_reblogs(query, _), do: query
852
853 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
854
855 defp exclude_poll_votes(query, _) do
856 if has_named_binding?(query, :object) do
857 from([activity, object: o] in query,
858 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
859 )
860 else
861 query
862 end
863 end
864
865 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
866
867 defp maybe_preload_objects(query, _) do
868 query
869 |> Activity.with_preloaded_object()
870 end
871
872 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
873
874 defp maybe_preload_bookmarks(query, opts) do
875 query
876 |> Activity.with_preloaded_bookmark(opts["user"])
877 end
878
879 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
880
881 defp maybe_set_thread_muted_field(query, opts) do
882 query
883 |> Activity.with_set_thread_muted_field(opts["user"])
884 end
885
886 defp maybe_order(query, %{order: :desc}) do
887 query
888 |> order_by(desc: :id)
889 end
890
891 defp maybe_order(query, %{order: :asc}) do
892 query
893 |> order_by(asc: :id)
894 end
895
896 defp maybe_order(query, _), do: query
897
898 def fetch_activities_query(recipients, opts \\ %{}) do
899 base_query = from(activity in Activity)
900
901 config = %{
902 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
903 }
904
905 base_query
906 |> maybe_preload_objects(opts)
907 |> maybe_preload_bookmarks(opts)
908 |> maybe_set_thread_muted_field(opts)
909 |> maybe_order(opts)
910 |> restrict_recipients(recipients, opts["user"])
911 |> restrict_tag(opts)
912 |> restrict_tag_reject(opts)
913 |> restrict_tag_all(opts)
914 |> restrict_since(opts)
915 |> restrict_local(opts)
916 |> restrict_actor(opts)
917 |> restrict_type(opts)
918 |> restrict_state(opts)
919 |> restrict_favorited_by(opts)
920 |> restrict_blocked(opts)
921 |> restrict_muted(opts)
922 |> restrict_media(opts)
923 |> restrict_visibility(opts)
924 |> restrict_thread_visibility(opts, config)
925 |> restrict_replies(opts)
926 |> restrict_reblogs(opts)
927 |> restrict_pinned(opts)
928 |> restrict_muted_reblogs(opts)
929 |> Activity.restrict_deactivated_users()
930 |> exclude_poll_votes(opts)
931 end
932
933 def fetch_activities(recipients, opts \\ %{}) do
934 fetch_activities_query(recipients, opts)
935 |> Pagination.fetch_paginated(opts)
936 |> Enum.reverse()
937 end
938
939 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
940 from(activity in query,
941 where:
942 fragment("? && ?", activity.recipients, ^recipients) or
943 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
944 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
945 )
946 end
947
948 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
949 fetch_activities_query([], opts)
950 |> fetch_activities_bounded_query(recipients, recipients_with_public)
951 |> Pagination.fetch_paginated(opts)
952 |> Enum.reverse()
953 end
954
955 def upload(file, opts \\ []) do
956 with {:ok, data} <- Upload.store(file, opts) do
957 obj_data =
958 if opts[:actor] do
959 Map.put(data, "actor", opts[:actor])
960 else
961 data
962 end
963
964 Repo.insert(%Object{data: obj_data})
965 end
966 end
967
968 defp object_to_user_data(data) do
969 avatar =
970 data["icon"]["url"] &&
971 %{
972 "type" => "Image",
973 "url" => [%{"href" => data["icon"]["url"]}]
974 }
975
976 banner =
977 data["image"]["url"] &&
978 %{
979 "type" => "Image",
980 "url" => [%{"href" => data["image"]["url"]}]
981 }
982
983 locked = data["manuallyApprovesFollowers"] || false
984 data = Transmogrifier.maybe_fix_user_object(data)
985
986 user_data = %{
987 ap_id: data["id"],
988 info: %{
989 "ap_enabled" => true,
990 "source_data" => data,
991 "banner" => banner,
992 "locked" => locked
993 },
994 avatar: avatar,
995 name: data["name"],
996 follower_address: data["followers"],
997 following_address: data["following"],
998 bio: data["summary"]
999 }
1000
1001 # nickname can be nil because of virtual actors
1002 user_data =
1003 if data["preferredUsername"] do
1004 Map.put(
1005 user_data,
1006 :nickname,
1007 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1008 )
1009 else
1010 Map.put(user_data, :nickname, nil)
1011 end
1012
1013 {:ok, user_data}
1014 end
1015
1016 defp maybe_update_follow_information(data) do
1017 with {:enabled, true} <-
1018 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1019 {:ok, following_data} <-
1020 Fetcher.fetch_and_contain_remote_object_from_id(data.following_address),
1021 following_count <- following_data["totalItems"],
1022 hide_follows <- collection_private?(following_data),
1023 {:ok, followers_data} <-
1024 Fetcher.fetch_and_contain_remote_object_from_id(data.follower_address),
1025 followers_count <- followers_data["totalItems"],
1026 hide_followers <- collection_private?(followers_data) do
1027 info = %{
1028 "hide_follows" => hide_follows,
1029 "follower_count" => followers_count,
1030 "following_count" => following_count,
1031 "hide_followers" => hide_followers
1032 }
1033
1034 info = Map.merge(data.info, info)
1035 Map.put(data, :info, info)
1036 else
1037 {:enabled, false} ->
1038 data
1039
1040 e ->
1041 Logger.error(
1042 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1043 )
1044
1045 data
1046 end
1047 end
1048
1049 defp collection_private?(data) do
1050 if is_map(data["first"]) and
1051 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1052 false
1053 else
1054 with {:ok, _data} <- Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1055 false
1056 else
1057 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1058 true
1059
1060 _e ->
1061 false
1062 end
1063 end
1064 end
1065
1066 def user_data_from_user_object(data) do
1067 with {:ok, data} <- MRF.filter(data),
1068 {:ok, data} <- object_to_user_data(data) do
1069 {:ok, data}
1070 else
1071 e -> {:error, e}
1072 end
1073 end
1074
1075 def fetch_and_prepare_user_from_ap_id(ap_id) do
1076 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1077 {:ok, data} <- user_data_from_user_object(data),
1078 data <- maybe_update_follow_information(data) do
1079 {:ok, data}
1080 else
1081 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1082 end
1083 end
1084
1085 def make_user_from_ap_id(ap_id) do
1086 if _user = User.get_cached_by_ap_id(ap_id) do
1087 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1088 else
1089 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1090 User.insert_or_update_user(data)
1091 else
1092 e -> {:error, e}
1093 end
1094 end
1095 end
1096
1097 def make_user_from_nickname(nickname) do
1098 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1099 make_user_from_ap_id(ap_id)
1100 else
1101 _e -> {:error, "No AP id in WebFinger"}
1102 end
1103 end
1104
1105 # filter out broken threads
1106 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1107 entire_thread_visible_for_user?(activity, user)
1108 end
1109
1110 # do post-processing on a specific activity
1111 def contain_activity(%Activity{} = activity, %User{} = user) do
1112 contain_broken_threads(activity, user)
1113 end
1114
1115 def fetch_direct_messages_query do
1116 Activity
1117 |> restrict_type(%{"type" => "Create"})
1118 |> restrict_visibility(%{visibility: "direct"})
1119 |> order_by([activity], asc: activity.id)
1120 end
1121 end