[#1149] Added more oban workers. Refactoring.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20 alias Pleroma.Workers.BackgroundWorker
21
22 import Ecto.Query
23 import Pleroma.Web.ActivityPub.Utils
24 import Pleroma.Web.ActivityPub.Visibility
25
26 require Logger
27 require Pleroma.Constants
28
29 defdelegate worker_args(queue), to: Pleroma.Workers.Helper
30
31 # For Announce activities, we filter the recipients based on following status for any actors
32 # that match actual users. See issue #164 for more information about why this is necessary.
33 defp get_recipients(%{"type" => "Announce"} = data) do
34 to = Map.get(data, "to", [])
35 cc = Map.get(data, "cc", [])
36 bcc = Map.get(data, "bcc", [])
37 actor = User.get_cached_by_ap_id(data["actor"])
38
39 recipients =
40 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
41 case User.get_cached_by_ap_id(recipient) do
42 nil -> true
43 user -> User.following?(user, actor)
44 end
45 end)
46
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(%{"type" => "Create"} = data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 actor = Map.get(data, "actor", [])
55 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
56 {recipients, to, cc}
57 end
58
59 defp get_recipients(data) do
60 to = Map.get(data, "to", [])
61 cc = Map.get(data, "cc", [])
62 bcc = Map.get(data, "bcc", [])
63 recipients = Enum.concat([to, cc, bcc])
64 {recipients, to, cc}
65 end
66
67 defp check_actor_is_active(actor) do
68 if not is_nil(actor) do
69 with user <- User.get_cached_by_ap_id(actor),
70 false <- user.info.deactivated do
71 :ok
72 else
73 _e -> :reject
74 end
75 else
76 :ok
77 end
78 end
79
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
83 end
84
85 defp check_remote_limit(_), do: true
86
87 def increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
89 end
90
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
93 end
94
95 def increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
97 "type" => "Create"
98 }) do
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
101 end
102 end
103
104 def increase_replies_count_if_reply(_create_data), do: :noop
105
106 def decrease_replies_count_if_reply(%Object{
107 data: %{"inReplyTo" => reply_ap_id} = object
108 }) do
109 if is_public?(object) do
110 Object.decrease_replies_count(reply_ap_id)
111 end
112 end
113
114 def decrease_replies_count_if_reply(_object), do: :noop
115
116 def increase_poll_votes_if_vote(%{
117 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
118 "type" => "Create"
119 }) do
120 Object.increase_vote_count(reply_ap_id, name)
121 end
122
123 def increase_poll_votes_if_vote(_create_data), do: :noop
124
125 def insert(map, local \\ true, fake \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 :ok <- check_actor_is_active(map["actor"]),
129 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 :ok <- Containment.contain_child(map),
134 {:ok, map, object} <- insert_full_object(map) do
135 {:ok, activity} =
136 Repo.insert(%Activity{
137 data: map,
138 local: local,
139 actor: map["actor"],
140 recipients: recipients
141 })
142
143 # Splice in the child object if we have one.
144 activity =
145 if !is_nil(object) do
146 Map.put(activity, :object, object)
147 else
148 activity
149 end
150
151 %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
152 |> BackgroundWorker.new(worker_args(:background))
153 |> Repo.insert()
154
155 Notification.create_notifications(activity)
156
157 participations =
158 activity
159 |> Conversation.create_or_bump_for()
160 |> get_participations()
161
162 stream_out(activity)
163 stream_out_participations(participations)
164 {:ok, activity}
165 else
166 %Activity{} = activity ->
167 {:ok, activity}
168
169 {:fake, true, map, recipients} ->
170 activity = %Activity{
171 data: map,
172 local: local,
173 actor: map["actor"],
174 recipients: recipients,
175 id: "pleroma:fakeid"
176 }
177
178 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
179 {:ok, activity}
180
181 error ->
182 {:error, error}
183 end
184 end
185
186 defp get_participations({:ok, %{participations: participations}}), do: participations
187 defp get_participations(_), do: []
188
189 def stream_out_participations(participations) do
190 participations =
191 participations
192 |> Repo.preload(:user)
193
194 Enum.each(participations, fn participation ->
195 Pleroma.Web.Streamer.stream("participation", participation)
196 end)
197 end
198
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
201 conversation = Repo.preload(conversation, :participations),
202 last_activity_id =
203 fetch_latest_activity_id_for_context(conversation.ap_id, %{
204 "user" => user,
205 "blocking_user" => user
206 }) do
207 if last_activity_id do
208 stream_out_participations(conversation.participations)
209 end
210 end
211 end
212
213 def stream_out_participations(_, _), do: :noop
214
215 def stream_out(activity) do
216 if activity.data["type"] in ["Create", "Announce", "Delete"] do
217 object = Object.normalize(activity)
218 # Do not stream out poll replies
219 unless object.data["type"] == "Answer" do
220 Pleroma.Web.Streamer.stream("user", activity)
221 Pleroma.Web.Streamer.stream("list", activity)
222
223 if get_visibility(activity) == "public" do
224 Pleroma.Web.Streamer.stream("public", activity)
225
226 if activity.local do
227 Pleroma.Web.Streamer.stream("public:local", activity)
228 end
229
230 if activity.data["type"] in ["Create"] do
231 object.data
232 |> Map.get("tag", [])
233 |> Enum.filter(fn tag -> is_bitstring(tag) end)
234 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
235
236 if object.data["attachment"] != [] do
237 Pleroma.Web.Streamer.stream("public:media", activity)
238
239 if activity.local do
240 Pleroma.Web.Streamer.stream("public:local:media", activity)
241 end
242 end
243 end
244 else
245 if get_visibility(activity) == "direct",
246 do: Pleroma.Web.Streamer.stream("direct", activity)
247 end
248 end
249 end
250 end
251
252 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257
258 with create_data <-
259 make_create_data(
260 %{to: to, actor: actor, published: published, context: context, object: object},
261 additional
262 ),
263 {:ok, activity} <- insert(create_data, local, fake),
264 {:fake, false, activity} <- {:fake, fake, activity},
265 _ <- increase_replies_count_if_reply(create_data),
266 _ <- increase_poll_votes_if_vote(create_data),
267 # Changing note count prior to enqueuing federation task in order to avoid
268 # race conditions on updating user.info
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 :ok <- maybe_federate(activity) do
271 {:ok, activity}
272 else
273 {:fake, true, activity} ->
274 {:ok, activity}
275
276 {:error, message} ->
277 {:error, message}
278 end
279 end
280
281 def accept(%{to: to, actor: actor, object: object} = params) do
282 # only accept false as false value
283 local = !(params[:local] == false)
284
285 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
286 {:ok, activity} <- insert(data, local),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 end
290 end
291
292 def reject(%{to: to, actor: actor, object: object} = params) do
293 # only accept false as false value
294 local = !(params[:local] == false)
295
296 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
297 {:ok, activity} <- insert(data, local),
298 :ok <- maybe_federate(activity) do
299 {:ok, activity}
300 end
301 end
302
303 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
304 # only accept false as false value
305 local = !(params[:local] == false)
306
307 with data <- %{
308 "to" => to,
309 "cc" => cc,
310 "type" => "Update",
311 "actor" => actor,
312 "object" => object
313 },
314 {:ok, activity} <- insert(data, local),
315 :ok <- maybe_federate(activity) do
316 {:ok, activity}
317 end
318 end
319
320 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
321 def like(
322 %User{ap_id: ap_id} = user,
323 %Object{data: %{"id" => _}} = object,
324 activity_id \\ nil,
325 local \\ true
326 ) do
327 with nil <- get_existing_like(ap_id, object),
328 like_data <- make_like_data(user, object, activity_id),
329 {:ok, activity} <- insert(like_data, local),
330 {:ok, object} <- add_like_to_object(activity, object),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity, object}
333 else
334 %Activity{} = activity -> {:ok, activity, object}
335 error -> {:error, error}
336 end
337 end
338
339 def unlike(
340 %User{} = actor,
341 %Object{} = object,
342 activity_id \\ nil,
343 local \\ true
344 ) do
345 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
346 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
347 {:ok, unlike_activity} <- insert(unlike_data, local),
348 {:ok, _activity} <- Repo.delete(like_activity),
349 {:ok, object} <- remove_like_from_object(like_activity, object),
350 :ok <- maybe_federate(unlike_activity) do
351 {:ok, unlike_activity, like_activity, object}
352 else
353 _e -> {:ok, object}
354 end
355 end
356
357 def announce(
358 %User{ap_id: _} = user,
359 %Object{data: %{"id" => _}} = object,
360 activity_id \\ nil,
361 local \\ true,
362 public \\ true
363 ) do
364 with true <- is_public?(object),
365 announce_data <- make_announce_data(user, object, activity_id, public),
366 {:ok, activity} <- insert(announce_data, local),
367 {:ok, object} <- add_announce_to_object(activity, object),
368 :ok <- maybe_federate(activity) do
369 {:ok, activity, object}
370 else
371 error -> {:error, error}
372 end
373 end
374
375 def unannounce(
376 %User{} = actor,
377 %Object{} = object,
378 activity_id \\ nil,
379 local \\ true
380 ) do
381 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
382 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
383 {:ok, unannounce_activity} <- insert(unannounce_data, local),
384 :ok <- maybe_federate(unannounce_activity),
385 {:ok, _activity} <- Repo.delete(announce_activity),
386 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
387 {:ok, unannounce_activity, object}
388 else
389 _e -> {:ok, object}
390 end
391 end
392
393 def follow(follower, followed, activity_id \\ nil, local \\ true) do
394 with data <- make_follow_data(follower, followed, activity_id),
395 {:ok, activity} <- insert(data, local),
396 :ok <- maybe_federate(activity) do
397 {:ok, activity}
398 end
399 end
400
401 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
402 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
403 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
404 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
405 {:ok, activity} <- insert(unfollow_data, local),
406 :ok <- maybe_federate(activity) do
407 {:ok, activity}
408 end
409 end
410
411 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
412 with data <- %{
413 "to" => [follower_address],
414 "type" => "Delete",
415 "actor" => ap_id,
416 "object" => %{"type" => "Person", "id" => ap_id}
417 },
418 {:ok, activity} <- insert(data, true, true),
419 :ok <- maybe_federate(activity) do
420 {:ok, user}
421 end
422 end
423
424 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
425 user = User.get_cached_by_ap_id(actor)
426 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
427
428 with {:ok, object, activity} <- Object.delete(object),
429 data <- %{
430 "type" => "Delete",
431 "actor" => actor,
432 "object" => id,
433 "to" => to,
434 "deleted_activity_id" => activity && activity.id
435 },
436 {:ok, activity} <- insert(data, local, false),
437 stream_out_participations(object, user),
438 _ <- decrease_replies_count_if_reply(object),
439 # Changing note count prior to enqueuing federation task in order to avoid
440 # race conditions on updating user.info
441 {:ok, _actor} <- decrease_note_count_if_public(user, object),
442 :ok <- maybe_federate(activity) do
443 {:ok, activity}
444 end
445 end
446
447 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
448 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
449 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
450
451 if unfollow_blocked do
452 follow_activity = fetch_latest_follow(blocker, blocked)
453 if follow_activity, do: unfollow(blocker, blocked, nil, local)
454 end
455
456 with true <- outgoing_blocks,
457 block_data <- make_block_data(blocker, blocked, activity_id),
458 {:ok, activity} <- insert(block_data, local),
459 :ok <- maybe_federate(activity) do
460 {:ok, activity}
461 else
462 _e -> {:ok, nil}
463 end
464 end
465
466 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
467 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
468 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
469 {:ok, activity} <- insert(unblock_data, local),
470 :ok <- maybe_federate(activity) do
471 {:ok, activity}
472 end
473 end
474
475 def flag(
476 %{
477 actor: actor,
478 context: context,
479 account: account,
480 statuses: statuses,
481 content: content
482 } = params
483 ) do
484 # only accept false as false value
485 local = !(params[:local] == false)
486 forward = !(params[:forward] == false)
487
488 additional = params[:additional] || %{}
489
490 params = %{
491 actor: actor,
492 context: context,
493 account: account,
494 statuses: statuses,
495 content: content
496 }
497
498 additional =
499 if forward do
500 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
501 else
502 Map.merge(additional, %{"to" => [], "cc" => []})
503 end
504
505 with flag_data <- make_flag_data(params, additional),
506 {:ok, activity} <- insert(flag_data, local),
507 :ok <- maybe_federate(activity) do
508 Enum.each(User.all_superusers(), fn superuser ->
509 superuser
510 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
511 |> Pleroma.Emails.Mailer.deliver_async()
512 end)
513
514 {:ok, activity}
515 end
516 end
517
518 defp fetch_activities_for_context_query(context, opts) do
519 public = [Pleroma.Constants.as_public()]
520
521 recipients =
522 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
523
524 from(activity in Activity)
525 |> maybe_preload_objects(opts)
526 |> restrict_blocked(opts)
527 |> restrict_recipients(recipients, opts["user"])
528 |> where(
529 [activity],
530 fragment(
531 "?->>'type' = ? and ?->>'context' = ?",
532 activity.data,
533 "Create",
534 activity.data,
535 ^context
536 )
537 )
538 |> exclude_poll_votes(opts)
539 |> order_by([activity], desc: activity.id)
540 end
541
542 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
543 def fetch_activities_for_context(context, opts \\ %{}) do
544 context
545 |> fetch_activities_for_context_query(opts)
546 |> Repo.all()
547 end
548
549 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
550 Pleroma.FlakeId.t() | nil
551 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
552 context
553 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
554 |> limit(1)
555 |> select([a], a.id)
556 |> Repo.one()
557 end
558
559 def fetch_public_activities(opts \\ %{}) do
560 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
561
562 q
563 |> restrict_unlisted()
564 |> Pagination.fetch_paginated(opts)
565 |> Enum.reverse()
566 end
567
568 @valid_visibilities ~w[direct unlisted public private]
569
570 defp restrict_visibility(query, %{visibility: visibility})
571 when is_list(visibility) do
572 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
573 query =
574 from(
575 a in query,
576 where:
577 fragment(
578 "activity_visibility(?, ?, ?) = ANY (?)",
579 a.actor,
580 a.recipients,
581 a.data,
582 ^visibility
583 )
584 )
585
586 query
587 else
588 Logger.error("Could not restrict visibility to #{visibility}")
589 end
590 end
591
592 defp restrict_visibility(query, %{visibility: visibility})
593 when visibility in @valid_visibilities do
594 from(
595 a in query,
596 where:
597 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
598 )
599 end
600
601 defp restrict_visibility(_query, %{visibility: visibility})
602 when visibility not in @valid_visibilities do
603 Logger.error("Could not restrict visibility to #{visibility}")
604 end
605
606 defp restrict_visibility(query, _visibility), do: query
607
608 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
609 do: query
610
611 defp restrict_thread_visibility(
612 query,
613 %{"user" => %User{info: %{skip_thread_containment: true}}},
614 _
615 ),
616 do: query
617
618 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
619 from(
620 a in query,
621 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
622 )
623 end
624
625 defp restrict_thread_visibility(query, _, _), do: query
626
627 def fetch_user_activities(user, reading_user, params \\ %{}) do
628 params =
629 params
630 |> Map.put("type", ["Create", "Announce"])
631 |> Map.put("actor_id", user.ap_id)
632 |> Map.put("whole_db", true)
633 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
634
635 recipients =
636 user_activities_recipients(%{
637 "godmode" => params["godmode"],
638 "reading_user" => reading_user
639 })
640
641 fetch_activities(recipients, params)
642 |> Enum.reverse()
643 end
644
645 defp user_activities_recipients(%{"godmode" => true}) do
646 []
647 end
648
649 defp user_activities_recipients(%{"reading_user" => reading_user}) do
650 if reading_user do
651 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
652 else
653 [Pleroma.Constants.as_public()]
654 end
655 end
656
657 defp restrict_since(query, %{"since_id" => ""}), do: query
658
659 defp restrict_since(query, %{"since_id" => since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
670 when is_list(tag_reject) and tag_reject != [] do
671 from(
672 [_activity, object] in query,
673 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
674 )
675 end
676
677 defp restrict_tag_reject(query, _), do: query
678
679 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
680 raise "Can't use the child object without preloading!"
681 end
682
683 defp restrict_tag_all(query, %{"tag_all" => tag_all})
684 when is_list(tag_all) and tag_all != [] do
685 from(
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
688 )
689 end
690
691 defp restrict_tag_all(query, _), do: query
692
693 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
694 raise "Can't use the child object without preloading!"
695 end
696
697 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
698 from(
699 [_activity, object] in query,
700 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
701 )
702 end
703
704 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
705 from(
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
708 )
709 end
710
711 defp restrict_tag(query, _), do: query
712
713 defp restrict_recipients(query, [], _user), do: query
714
715 defp restrict_recipients(query, recipients, nil) do
716 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 end
718
719 defp restrict_recipients(query, recipients, user) do
720 from(
721 activity in query,
722 where: fragment("? && ?", ^recipients, activity.recipients),
723 or_where: activity.actor == ^user.ap_id
724 )
725 end
726
727 defp restrict_local(query, %{"local_only" => true}) do
728 from(activity in query, where: activity.local == true)
729 end
730
731 defp restrict_local(query, _), do: query
732
733 defp restrict_actor(query, %{"actor_id" => actor_id}) do
734 from(activity in query, where: activity.actor == ^actor_id)
735 end
736
737 defp restrict_actor(query, _), do: query
738
739 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
740 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 end
742
743 defp restrict_type(query, %{"type" => type}) do
744 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 end
746
747 defp restrict_type(query, _), do: query
748
749 defp restrict_state(query, %{"state" => state}) do
750 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 end
752
753 defp restrict_state(query, _), do: query
754
755 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
756 from(
757 [_activity, object] in query,
758 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
759 )
760 end
761
762 defp restrict_favorited_by(query, _), do: query
763
764 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
765 raise "Can't use the child object without preloading!"
766 end
767
768 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
769 from(
770 [_activity, object] in query,
771 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
772 )
773 end
774
775 defp restrict_media(query, _), do: query
776
777 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
778 from(
779 activity in query,
780 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
781 )
782 end
783
784 defp restrict_replies(query, _), do: query
785
786 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
787 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
788 end
789
790 defp restrict_reblogs(query, _), do: query
791
792 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
793
794 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
795 mutes = info.mutes
796
797 from(
798 activity in query,
799 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
800 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
801 )
802 end
803
804 defp restrict_muted(query, _), do: query
805
806 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
807 blocks = info.blocks || []
808 domain_blocks = info.domain_blocks || []
809
810 query =
811 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
812
813 from(
814 [activity, object: o] in query,
815 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
816 where: fragment("not (? && ?)", activity.recipients, ^blocks),
817 where:
818 fragment(
819 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
820 activity.data,
821 activity.data,
822 ^blocks
823 ),
824 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
825 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
826 )
827 end
828
829 defp restrict_blocked(query, _), do: query
830
831 defp restrict_unlisted(query) do
832 from(
833 activity in query,
834 where:
835 fragment(
836 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
837 activity.data,
838 ^[Pleroma.Constants.as_public()]
839 )
840 )
841 end
842
843 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
844 from(activity in query, where: activity.id in ^ids)
845 end
846
847 defp restrict_pinned(query, _), do: query
848
849 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
850 muted_reblogs = info.muted_reblogs || []
851
852 from(
853 activity in query,
854 where:
855 fragment(
856 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
857 activity.data,
858 activity.actor,
859 ^muted_reblogs
860 )
861 )
862 end
863
864 defp restrict_muted_reblogs(query, _), do: query
865
866 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
867
868 defp exclude_poll_votes(query, _) do
869 if has_named_binding?(query, :object) do
870 from([activity, object: o] in query,
871 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
872 )
873 else
874 query
875 end
876 end
877
878 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
879
880 defp maybe_preload_objects(query, _) do
881 query
882 |> Activity.with_preloaded_object()
883 end
884
885 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
886
887 defp maybe_preload_bookmarks(query, opts) do
888 query
889 |> Activity.with_preloaded_bookmark(opts["user"])
890 end
891
892 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
893
894 defp maybe_set_thread_muted_field(query, opts) do
895 query
896 |> Activity.with_set_thread_muted_field(opts["user"])
897 end
898
899 defp maybe_order(query, %{order: :desc}) do
900 query
901 |> order_by(desc: :id)
902 end
903
904 defp maybe_order(query, %{order: :asc}) do
905 query
906 |> order_by(asc: :id)
907 end
908
909 defp maybe_order(query, _), do: query
910
911 def fetch_activities_query(recipients, opts \\ %{}) do
912 config = %{
913 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
914 }
915
916 Activity
917 |> maybe_preload_objects(opts)
918 |> maybe_preload_bookmarks(opts)
919 |> maybe_set_thread_muted_field(opts)
920 |> maybe_order(opts)
921 |> restrict_recipients(recipients, opts["user"])
922 |> restrict_tag(opts)
923 |> restrict_tag_reject(opts)
924 |> restrict_tag_all(opts)
925 |> restrict_since(opts)
926 |> restrict_local(opts)
927 |> restrict_actor(opts)
928 |> restrict_type(opts)
929 |> restrict_state(opts)
930 |> restrict_favorited_by(opts)
931 |> restrict_blocked(opts)
932 |> restrict_muted(opts)
933 |> restrict_media(opts)
934 |> restrict_visibility(opts)
935 |> restrict_thread_visibility(opts, config)
936 |> restrict_replies(opts)
937 |> restrict_reblogs(opts)
938 |> restrict_pinned(opts)
939 |> restrict_muted_reblogs(opts)
940 |> Activity.restrict_deactivated_users()
941 |> exclude_poll_votes(opts)
942 end
943
944 def fetch_activities(recipients, opts \\ %{}) do
945 list_memberships = Pleroma.List.memberships(opts["user"])
946
947 fetch_activities_query(recipients ++ list_memberships, opts)
948 |> Pagination.fetch_paginated(opts)
949 |> Enum.reverse()
950 |> maybe_update_cc(list_memberships, opts["user"])
951 end
952
953 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
954 when is_list(list_memberships) and length(list_memberships) > 0 do
955 Enum.map(activities, fn
956 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
957 if Enum.any?(bcc, &(&1 in list_memberships)) do
958 update_in(activity.data["cc"], &[user_ap_id | &1])
959 else
960 activity
961 end
962
963 activity ->
964 activity
965 end)
966 end
967
968 defp maybe_update_cc(activities, _, _), do: activities
969
970 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
971 from(activity in query,
972 where:
973 fragment("? && ?", activity.recipients, ^recipients) or
974 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
975 ^Pleroma.Constants.as_public() in activity.recipients)
976 )
977 end
978
979 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
980 fetch_activities_query([], opts)
981 |> fetch_activities_bounded_query(recipients, recipients_with_public)
982 |> Pagination.fetch_paginated(opts)
983 |> Enum.reverse()
984 end
985
986 def upload(file, opts \\ []) do
987 with {:ok, data} <- Upload.store(file, opts) do
988 obj_data =
989 if opts[:actor] do
990 Map.put(data, "actor", opts[:actor])
991 else
992 data
993 end
994
995 Repo.insert(%Object{data: obj_data})
996 end
997 end
998
999 defp object_to_user_data(data) do
1000 avatar =
1001 data["icon"]["url"] &&
1002 %{
1003 "type" => "Image",
1004 "url" => [%{"href" => data["icon"]["url"]}]
1005 }
1006
1007 banner =
1008 data["image"]["url"] &&
1009 %{
1010 "type" => "Image",
1011 "url" => [%{"href" => data["image"]["url"]}]
1012 }
1013
1014 locked = data["manuallyApprovesFollowers"] || false
1015 data = Transmogrifier.maybe_fix_user_object(data)
1016
1017 user_data = %{
1018 ap_id: data["id"],
1019 info: %{
1020 ap_enabled: true,
1021 source_data: data,
1022 banner: banner,
1023 locked: locked
1024 },
1025 avatar: avatar,
1026 name: data["name"],
1027 follower_address: data["followers"],
1028 following_address: data["following"],
1029 bio: data["summary"]
1030 }
1031
1032 # nickname can be nil because of virtual actors
1033 user_data =
1034 if data["preferredUsername"] do
1035 Map.put(
1036 user_data,
1037 :nickname,
1038 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1039 )
1040 else
1041 Map.put(user_data, :nickname, nil)
1042 end
1043
1044 {:ok, user_data}
1045 end
1046
1047 def fetch_follow_information_for_user(user) do
1048 with {:ok, following_data} <-
1049 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1050 following_count when is_integer(following_count) <- following_data["totalItems"],
1051 {:ok, hide_follows} <- collection_private(following_data),
1052 {:ok, followers_data} <-
1053 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1054 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1055 {:ok, hide_followers} <- collection_private(followers_data) do
1056 {:ok,
1057 %{
1058 hide_follows: hide_follows,
1059 follower_count: followers_count,
1060 following_count: following_count,
1061 hide_followers: hide_followers
1062 }}
1063 else
1064 {:error, _} = e ->
1065 e
1066
1067 e ->
1068 {:error, e}
1069 end
1070 end
1071
1072 defp maybe_update_follow_information(data) do
1073 with {:enabled, true} <-
1074 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1075 {:ok, info} <- fetch_follow_information_for_user(data) do
1076 info = Map.merge(data.info, info)
1077 Map.put(data, :info, info)
1078 else
1079 {:enabled, false} ->
1080 data
1081
1082 e ->
1083 Logger.error(
1084 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1085 )
1086
1087 data
1088 end
1089 end
1090
1091 defp collection_private(data) do
1092 if is_map(data["first"]) and
1093 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1094 {:ok, false}
1095 else
1096 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1097 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1098 {:ok, false}
1099 else
1100 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1101 {:ok, true}
1102
1103 {:error, _} = e ->
1104 e
1105
1106 e ->
1107 {:error, e}
1108 end
1109 end
1110 end
1111
1112 def user_data_from_user_object(data) do
1113 with {:ok, data} <- MRF.filter(data),
1114 {:ok, data} <- object_to_user_data(data) do
1115 {:ok, data}
1116 else
1117 e -> {:error, e}
1118 end
1119 end
1120
1121 def fetch_and_prepare_user_from_ap_id(ap_id) do
1122 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1123 {:ok, data} <- user_data_from_user_object(data),
1124 data <- maybe_update_follow_information(data) do
1125 {:ok, data}
1126 else
1127 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1128 end
1129 end
1130
1131 def make_user_from_ap_id(ap_id) do
1132 if _user = User.get_cached_by_ap_id(ap_id) do
1133 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1134 else
1135 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1136 User.insert_or_update_user(data)
1137 else
1138 e -> {:error, e}
1139 end
1140 end
1141 end
1142
1143 def make_user_from_nickname(nickname) do
1144 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1145 make_user_from_ap_id(ap_id)
1146 else
1147 _e -> {:error, "No AP id in WebFinger"}
1148 end
1149 end
1150
1151 # filter out broken threads
1152 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1153 entire_thread_visible_for_user?(activity, user)
1154 end
1155
1156 # do post-processing on a specific activity
1157 def contain_activity(%Activity{} = activity, %User{} = user) do
1158 contain_broken_threads(activity, user)
1159 end
1160
1161 def fetch_direct_messages_query do
1162 Activity
1163 |> restrict_type(%{"type" => "Create"})
1164 |> restrict_visibility(%{visibility: "direct"})
1165 |> order_by([activity], asc: activity.id)
1166 end
1167 end