Merge branch 'feature/digest-email' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26 require Pleroma.Constants
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = Map.get(data, "to", [])
32 cc = Map.get(data, "cc", [])
33 bcc = Map.get(data, "bcc", [])
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil -> true
40 user -> User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 actor = Map.get(data, "actor", [])
52 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = Map.get(data, "to", [])
58 cc = Map.get(data, "cc", [])
59 bcc = Map.get(data, "bcc", [])
60 recipients = Enum.concat([to, cc, bcc])
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" => %{"inReplyTo" => reply_ap_id} = object,
94 "type" => "Create"
95 }) do
96 if is_public?(object) do
97 Object.increase_replies_count(reply_ap_id)
98 end
99 end
100
101 def increase_replies_count_if_reply(_create_data), do: :noop
102
103 def decrease_replies_count_if_reply(%Object{
104 data: %{"inReplyTo" => reply_ap_id} = object
105 }) do
106 if is_public?(object) do
107 Object.decrease_replies_count(reply_ap_id)
108 end
109 end
110
111 def decrease_replies_count_if_reply(_object), do: :noop
112
113 def increase_poll_votes_if_vote(%{
114 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
115 "type" => "Create"
116 }) do
117 Object.increase_vote_count(reply_ap_id, name)
118 end
119
120 def increase_poll_votes_if_vote(_create_data), do: :noop
121
122 def insert(map, local \\ true, fake \\ false) when is_map(map) do
123 with nil <- Activity.normalize(map),
124 map <- lazy_put_activity_defaults(map, fake),
125 :ok <- check_actor_is_active(map["actor"]),
126 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
127 {:ok, map} <- MRF.filter(map),
128 {recipients, _, _} = get_recipients(map),
129 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
130 :ok <- Containment.contain_child(map),
131 {:ok, map, object} <- insert_full_object(map) do
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: map,
135 local: local,
136 actor: map["actor"],
137 recipients: recipients
138 })
139
140 # Splice in the child object if we have one.
141 activity =
142 if !is_nil(object) do
143 Map.put(activity, :object, object)
144 else
145 activity
146 end
147
148 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
149
150 Notification.create_notifications(activity)
151
152 participations =
153 activity
154 |> Conversation.create_or_bump_for()
155 |> get_participations()
156
157 stream_out(activity)
158 stream_out_participations(participations)
159 {:ok, activity}
160 else
161 %Activity{} = activity ->
162 {:ok, activity}
163
164 {:fake, true, map, recipients} ->
165 activity = %Activity{
166 data: map,
167 local: local,
168 actor: map["actor"],
169 recipients: recipients,
170 id: "pleroma:fakeid"
171 }
172
173 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
174 {:ok, activity}
175
176 error ->
177 {:error, error}
178 end
179 end
180
181 defp get_participations({:ok, %{participations: participations}}), do: participations
182 defp get_participations(_), do: []
183
184 def stream_out_participations(participations) do
185 participations =
186 participations
187 |> Repo.preload(:user)
188
189 Enum.each(participations, fn participation ->
190 Pleroma.Web.Streamer.stream("participation", participation)
191 end)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(activity) do
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if get_visibility(activity) == "public" do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 if get_visibility(activity) == "direct",
241 do: Pleroma.Web.Streamer.stream("direct", activity)
242 end
243 end
244 end
245 end
246
247 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
248 additional = params[:additional] || %{}
249 # only accept false as false value
250 local = !(params[:local] == false)
251 published = params[:published]
252
253 with create_data <-
254 make_create_data(
255 %{to: to, actor: actor, published: published, context: context, object: object},
256 additional
257 ),
258 {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 # Changing note count prior to enqueuing federation task in order to avoid
263 # race conditions on updating user.info
264 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:fake, true, activity} ->
269 {:ok, activity}
270
271 {:error, message} ->
272 {:error, message}
273 end
274 end
275
276 def accept(%{to: to, actor: actor, object: object} = params) do
277 # only accept false as false value
278 local = !(params[:local] == false)
279
280 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
281 {:ok, activity} <- insert(data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 end
285 end
286
287 def reject(%{to: to, actor: actor, object: object} = params) do
288 # only accept false as false value
289 local = !(params[:local] == false)
290
291 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 # only accept false as false value
300 local = !(params[:local] == false)
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 {:ok, activity} <- insert(data, local),
310 :ok <- maybe_federate(activity) do
311 {:ok, activity}
312 end
313 end
314
315 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
316 def like(
317 %User{ap_id: ap_id} = user,
318 %Object{data: %{"id" => _}} = object,
319 activity_id \\ nil,
320 local \\ true
321 ) do
322 with nil <- get_existing_like(ap_id, object),
323 like_data <- make_like_data(user, object, activity_id),
324 {:ok, activity} <- insert(like_data, local),
325 {:ok, object} <- add_like_to_object(activity, object),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity, object}
328 else
329 %Activity{} = activity -> {:ok, activity, object}
330 error -> {:error, error}
331 end
332 end
333
334 def unlike(
335 %User{} = actor,
336 %Object{} = object,
337 activity_id \\ nil,
338 local \\ true
339 ) do
340 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
341 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
342 {:ok, unlike_activity} <- insert(unlike_data, local),
343 {:ok, _activity} <- Repo.delete(like_activity),
344 {:ok, object} <- remove_like_from_object(like_activity, object),
345 :ok <- maybe_federate(unlike_activity) do
346 {:ok, unlike_activity, like_activity, object}
347 else
348 _e -> {:ok, object}
349 end
350 end
351
352 def announce(
353 %User{ap_id: _} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true,
357 public \\ true
358 ) do
359 with true <- is_public?(object),
360 announce_data <- make_announce_data(user, object, activity_id, public),
361 {:ok, activity} <- insert(announce_data, local),
362 {:ok, object} <- add_announce_to_object(activity, object),
363 :ok <- maybe_federate(activity) do
364 {:ok, activity, object}
365 else
366 error -> {:error, error}
367 end
368 end
369
370 def unannounce(
371 %User{} = actor,
372 %Object{} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
377 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
378 {:ok, unannounce_activity} <- insert(unannounce_data, local),
379 :ok <- maybe_federate(unannounce_activity),
380 {:ok, _activity} <- Repo.delete(announce_activity),
381 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
382 {:ok, unannounce_activity, object}
383 else
384 _e -> {:ok, object}
385 end
386 end
387
388 def follow(follower, followed, activity_id \\ nil, local \\ true) do
389 with data <- make_follow_data(follower, followed, activity_id),
390 {:ok, activity} <- insert(data, local),
391 :ok <- maybe_federate(activity) do
392 {:ok, activity}
393 end
394 end
395
396 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 :ok <- maybe_federate(activity) do
402 {:ok, activity}
403 end
404 end
405
406 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
407 with data <- %{
408 "to" => [follower_address],
409 "type" => "Delete",
410 "actor" => ap_id,
411 "object" => %{"type" => "Person", "id" => ap_id}
412 },
413 {:ok, activity} <- insert(data, true, true),
414 :ok <- maybe_federate(activity) do
415 {:ok, user}
416 end
417 end
418
419 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
420 user = User.get_cached_by_ap_id(actor)
421 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
422
423 with {:ok, object, activity} <- Object.delete(object),
424 data <- %{
425 "type" => "Delete",
426 "actor" => actor,
427 "object" => id,
428 "to" => to,
429 "deleted_activity_id" => activity && activity.id
430 },
431 {:ok, activity} <- insert(data, local, false),
432 stream_out_participations(object, user),
433 _ <- decrease_replies_count_if_reply(object),
434 # Changing note count prior to enqueuing federation task in order to avoid
435 # race conditions on updating user.info
436 {:ok, _actor} <- decrease_note_count_if_public(user, object),
437 :ok <- maybe_federate(activity) do
438 {:ok, activity}
439 end
440 end
441
442 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
443 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
444 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
445
446 if unfollow_blocked do
447 follow_activity = fetch_latest_follow(blocker, blocked)
448 if follow_activity, do: unfollow(blocker, blocked, nil, local)
449 end
450
451 with true <- outgoing_blocks,
452 block_data <- make_block_data(blocker, blocked, activity_id),
453 {:ok, activity} <- insert(block_data, local),
454 :ok <- maybe_federate(activity) do
455 {:ok, activity}
456 else
457 _e -> {:ok, nil}
458 end
459 end
460
461 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
462 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
463 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
464 {:ok, activity} <- insert(unblock_data, local),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 end
468 end
469
470 def flag(
471 %{
472 actor: actor,
473 context: context,
474 account: account,
475 statuses: statuses,
476 content: content
477 } = params
478 ) do
479 # only accept false as false value
480 local = !(params[:local] == false)
481 forward = !(params[:forward] == false)
482
483 additional = params[:additional] || %{}
484
485 params = %{
486 actor: actor,
487 context: context,
488 account: account,
489 statuses: statuses,
490 content: content
491 }
492
493 additional =
494 if forward do
495 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
496 else
497 Map.merge(additional, %{"to" => [], "cc" => []})
498 end
499
500 with flag_data <- make_flag_data(params, additional),
501 {:ok, activity} <- insert(flag_data, local),
502 :ok <- maybe_federate(activity) do
503 Enum.each(User.all_superusers(), fn superuser ->
504 superuser
505 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
506 |> Pleroma.Emails.Mailer.deliver_async()
507 end)
508
509 {:ok, activity}
510 end
511 end
512
513 defp fetch_activities_for_context_query(context, opts) do
514 public = [Pleroma.Constants.as_public()]
515
516 recipients =
517 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
518
519 from(activity in Activity)
520 |> maybe_preload_objects(opts)
521 |> restrict_blocked(opts)
522 |> restrict_recipients(recipients, opts["user"])
523 |> where(
524 [activity],
525 fragment(
526 "?->>'type' = ? and ?->>'context' = ?",
527 activity.data,
528 "Create",
529 activity.data,
530 ^context
531 )
532 )
533 |> exclude_poll_votes(opts)
534 |> order_by([activity], desc: activity.id)
535 end
536
537 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
538 def fetch_activities_for_context(context, opts \\ %{}) do
539 context
540 |> fetch_activities_for_context_query(opts)
541 |> Repo.all()
542 end
543
544 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
545 Pleroma.FlakeId.t() | nil
546 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
547 context
548 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
549 |> limit(1)
550 |> select([a], a.id)
551 |> Repo.one()
552 end
553
554 def fetch_public_activities(opts \\ %{}) do
555 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
556
557 q
558 |> restrict_unlisted()
559 |> Pagination.fetch_paginated(opts)
560 |> Enum.reverse()
561 end
562
563 @valid_visibilities ~w[direct unlisted public private]
564
565 defp restrict_visibility(query, %{visibility: visibility})
566 when is_list(visibility) do
567 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
568 query =
569 from(
570 a in query,
571 where:
572 fragment(
573 "activity_visibility(?, ?, ?) = ANY (?)",
574 a.actor,
575 a.recipients,
576 a.data,
577 ^visibility
578 )
579 )
580
581 query
582 else
583 Logger.error("Could not restrict visibility to #{visibility}")
584 end
585 end
586
587 defp restrict_visibility(query, %{visibility: visibility})
588 when visibility in @valid_visibilities do
589 from(
590 a in query,
591 where:
592 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
593 )
594 end
595
596 defp restrict_visibility(_query, %{visibility: visibility})
597 when visibility not in @valid_visibilities do
598 Logger.error("Could not restrict visibility to #{visibility}")
599 end
600
601 defp restrict_visibility(query, _visibility), do: query
602
603 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
604 do: query
605
606 defp restrict_thread_visibility(
607 query,
608 %{"user" => %User{info: %{skip_thread_containment: true}}},
609 _
610 ),
611 do: query
612
613 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
614 from(
615 a in query,
616 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
617 )
618 end
619
620 defp restrict_thread_visibility(query, _, _), do: query
621
622 def fetch_user_activities(user, reading_user, params \\ %{}) do
623 params =
624 params
625 |> Map.put("type", ["Create", "Announce"])
626 |> Map.put("actor_id", user.ap_id)
627 |> Map.put("whole_db", true)
628 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
629
630 recipients =
631 user_activities_recipients(%{
632 "godmode" => params["godmode"],
633 "reading_user" => reading_user
634 })
635
636 fetch_activities(recipients, params)
637 |> Enum.reverse()
638 end
639
640 defp user_activities_recipients(%{"godmode" => true}) do
641 []
642 end
643
644 defp user_activities_recipients(%{"reading_user" => reading_user}) do
645 if reading_user do
646 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
647 else
648 [Pleroma.Constants.as_public()]
649 end
650 end
651
652 defp restrict_since(query, %{"since_id" => ""}), do: query
653
654 defp restrict_since(query, %{"since_id" => since_id}) do
655 from(activity in query, where: activity.id > ^since_id)
656 end
657
658 defp restrict_since(query, _), do: query
659
660 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
661 raise "Can't use the child object without preloading!"
662 end
663
664 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
665 when is_list(tag_reject) and tag_reject != [] do
666 from(
667 [_activity, object] in query,
668 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
669 )
670 end
671
672 defp restrict_tag_reject(query, _), do: query
673
674 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
675 raise "Can't use the child object without preloading!"
676 end
677
678 defp restrict_tag_all(query, %{"tag_all" => tag_all})
679 when is_list(tag_all) and tag_all != [] do
680 from(
681 [_activity, object] in query,
682 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
683 )
684 end
685
686 defp restrict_tag_all(query, _), do: query
687
688 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
689 raise "Can't use the child object without preloading!"
690 end
691
692 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
693 from(
694 [_activity, object] in query,
695 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
696 )
697 end
698
699 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
700 from(
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
703 )
704 end
705
706 defp restrict_tag(query, _), do: query
707
708 defp restrict_recipients(query, [], _user), do: query
709
710 defp restrict_recipients(query, recipients, nil) do
711 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
712 end
713
714 defp restrict_recipients(query, recipients, user) do
715 from(
716 activity in query,
717 where: fragment("? && ?", ^recipients, activity.recipients),
718 or_where: activity.actor == ^user.ap_id
719 )
720 end
721
722 defp restrict_local(query, %{"local_only" => true}) do
723 from(activity in query, where: activity.local == true)
724 end
725
726 defp restrict_local(query, _), do: query
727
728 defp restrict_actor(query, %{"actor_id" => actor_id}) do
729 from(activity in query, where: activity.actor == ^actor_id)
730 end
731
732 defp restrict_actor(query, _), do: query
733
734 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
735 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
736 end
737
738 defp restrict_type(query, %{"type" => type}) do
739 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
740 end
741
742 defp restrict_type(query, _), do: query
743
744 defp restrict_state(query, %{"state" => state}) do
745 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
746 end
747
748 defp restrict_state(query, _), do: query
749
750 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
751 from(
752 [_activity, object] in query,
753 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
754 )
755 end
756
757 defp restrict_favorited_by(query, _), do: query
758
759 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
760 raise "Can't use the child object without preloading!"
761 end
762
763 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
764 from(
765 [_activity, object] in query,
766 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
767 )
768 end
769
770 defp restrict_media(query, _), do: query
771
772 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
773 from(
774 activity in query,
775 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
776 )
777 end
778
779 defp restrict_replies(query, _), do: query
780
781 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
782 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
783 end
784
785 defp restrict_reblogs(query, _), do: query
786
787 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
788
789 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
790 mutes = info.mutes
791
792 from(
793 activity in query,
794 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
795 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
796 )
797 end
798
799 defp restrict_muted(query, _), do: query
800
801 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
802 blocks = info.blocks || []
803 domain_blocks = info.domain_blocks || []
804
805 query =
806 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
807
808 from(
809 [activity, object: o] in query,
810 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
811 where: fragment("not (? && ?)", activity.recipients, ^blocks),
812 where:
813 fragment(
814 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
815 activity.data,
816 activity.data,
817 ^blocks
818 ),
819 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
820 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
821 )
822 end
823
824 defp restrict_blocked(query, _), do: query
825
826 defp restrict_unlisted(query) do
827 from(
828 activity in query,
829 where:
830 fragment(
831 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
832 activity.data,
833 ^[Pleroma.Constants.as_public()]
834 )
835 )
836 end
837
838 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
839 from(activity in query, where: activity.id in ^ids)
840 end
841
842 defp restrict_pinned(query, _), do: query
843
844 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
845 muted_reblogs = info.muted_reblogs || []
846
847 from(
848 activity in query,
849 where:
850 fragment(
851 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
852 activity.data,
853 activity.actor,
854 ^muted_reblogs
855 )
856 )
857 end
858
859 defp restrict_muted_reblogs(query, _), do: query
860
861 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
862
863 defp exclude_poll_votes(query, _) do
864 if has_named_binding?(query, :object) do
865 from([activity, object: o] in query,
866 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
867 )
868 else
869 query
870 end
871 end
872
873 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
874
875 defp maybe_preload_objects(query, _) do
876 query
877 |> Activity.with_preloaded_object()
878 end
879
880 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
881
882 defp maybe_preload_bookmarks(query, opts) do
883 query
884 |> Activity.with_preloaded_bookmark(opts["user"])
885 end
886
887 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
888
889 defp maybe_set_thread_muted_field(query, opts) do
890 query
891 |> Activity.with_set_thread_muted_field(opts["user"])
892 end
893
894 defp maybe_order(query, %{order: :desc}) do
895 query
896 |> order_by(desc: :id)
897 end
898
899 defp maybe_order(query, %{order: :asc}) do
900 query
901 |> order_by(asc: :id)
902 end
903
904 defp maybe_order(query, _), do: query
905
906 def fetch_activities_query(recipients, opts \\ %{}) do
907 config = %{
908 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
909 }
910
911 Activity
912 |> maybe_preload_objects(opts)
913 |> maybe_preload_bookmarks(opts)
914 |> maybe_set_thread_muted_field(opts)
915 |> maybe_order(opts)
916 |> restrict_recipients(recipients, opts["user"])
917 |> restrict_tag(opts)
918 |> restrict_tag_reject(opts)
919 |> restrict_tag_all(opts)
920 |> restrict_since(opts)
921 |> restrict_local(opts)
922 |> restrict_actor(opts)
923 |> restrict_type(opts)
924 |> restrict_state(opts)
925 |> restrict_favorited_by(opts)
926 |> restrict_blocked(opts)
927 |> restrict_muted(opts)
928 |> restrict_media(opts)
929 |> restrict_visibility(opts)
930 |> restrict_thread_visibility(opts, config)
931 |> restrict_replies(opts)
932 |> restrict_reblogs(opts)
933 |> restrict_pinned(opts)
934 |> restrict_muted_reblogs(opts)
935 |> Activity.restrict_deactivated_users()
936 |> exclude_poll_votes(opts)
937 end
938
939 def fetch_activities(recipients, opts \\ %{}) do
940 list_memberships = Pleroma.List.memberships(opts["user"])
941
942 fetch_activities_query(recipients ++ list_memberships, opts)
943 |> Pagination.fetch_paginated(opts)
944 |> Enum.reverse()
945 |> maybe_update_cc(list_memberships, opts["user"])
946 end
947
948 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
949 when is_list(list_memberships) and length(list_memberships) > 0 do
950 Enum.map(activities, fn
951 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
952 if Enum.any?(bcc, &(&1 in list_memberships)) do
953 update_in(activity.data["cc"], &[user_ap_id | &1])
954 else
955 activity
956 end
957
958 activity ->
959 activity
960 end)
961 end
962
963 defp maybe_update_cc(activities, _, _), do: activities
964
965 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
966 from(activity in query,
967 where:
968 fragment("? && ?", activity.recipients, ^recipients) or
969 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
970 ^Pleroma.Constants.as_public() in activity.recipients)
971 )
972 end
973
974 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
975 fetch_activities_query([], opts)
976 |> fetch_activities_bounded_query(recipients, recipients_with_public)
977 |> Pagination.fetch_paginated(opts)
978 |> Enum.reverse()
979 end
980
981 def upload(file, opts \\ []) do
982 with {:ok, data} <- Upload.store(file, opts) do
983 obj_data =
984 if opts[:actor] do
985 Map.put(data, "actor", opts[:actor])
986 else
987 data
988 end
989
990 Repo.insert(%Object{data: obj_data})
991 end
992 end
993
994 defp object_to_user_data(data) do
995 avatar =
996 data["icon"]["url"] &&
997 %{
998 "type" => "Image",
999 "url" => [%{"href" => data["icon"]["url"]}]
1000 }
1001
1002 banner =
1003 data["image"]["url"] &&
1004 %{
1005 "type" => "Image",
1006 "url" => [%{"href" => data["image"]["url"]}]
1007 }
1008
1009 locked = data["manuallyApprovesFollowers"] || false
1010 data = Transmogrifier.maybe_fix_user_object(data)
1011
1012 user_data = %{
1013 ap_id: data["id"],
1014 info: %{
1015 ap_enabled: true,
1016 source_data: data,
1017 banner: banner,
1018 locked: locked
1019 },
1020 avatar: avatar,
1021 name: data["name"],
1022 follower_address: data["followers"],
1023 following_address: data["following"],
1024 bio: data["summary"]
1025 }
1026
1027 # nickname can be nil because of virtual actors
1028 user_data =
1029 if data["preferredUsername"] do
1030 Map.put(
1031 user_data,
1032 :nickname,
1033 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1034 )
1035 else
1036 Map.put(user_data, :nickname, nil)
1037 end
1038
1039 {:ok, user_data}
1040 end
1041
1042 def fetch_follow_information_for_user(user) do
1043 with {:ok, following_data} <-
1044 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1045 following_count when is_integer(following_count) <- following_data["totalItems"],
1046 {:ok, hide_follows} <- collection_private(following_data),
1047 {:ok, followers_data} <-
1048 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1049 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1050 {:ok, hide_followers} <- collection_private(followers_data) do
1051 {:ok,
1052 %{
1053 hide_follows: hide_follows,
1054 follower_count: followers_count,
1055 following_count: following_count,
1056 hide_followers: hide_followers
1057 }}
1058 else
1059 {:error, _} = e ->
1060 e
1061
1062 e ->
1063 {:error, e}
1064 end
1065 end
1066
1067 defp maybe_update_follow_information(data) do
1068 with {:enabled, true} <-
1069 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1070 {:ok, info} <- fetch_follow_information_for_user(data) do
1071 info = Map.merge(data.info, info)
1072 Map.put(data, :info, info)
1073 else
1074 {:enabled, false} ->
1075 data
1076
1077 e ->
1078 Logger.error(
1079 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1080 )
1081
1082 data
1083 end
1084 end
1085
1086 defp collection_private(data) do
1087 if is_map(data["first"]) and
1088 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1089 {:ok, false}
1090 else
1091 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1092 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1093 {:ok, false}
1094 else
1095 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1096 {:ok, true}
1097
1098 {:error, _} = e ->
1099 e
1100
1101 e ->
1102 {:error, e}
1103 end
1104 end
1105 end
1106
1107 def user_data_from_user_object(data) do
1108 with {:ok, data} <- MRF.filter(data),
1109 {:ok, data} <- object_to_user_data(data) do
1110 {:ok, data}
1111 else
1112 e -> {:error, e}
1113 end
1114 end
1115
1116 def fetch_and_prepare_user_from_ap_id(ap_id) do
1117 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1118 {:ok, data} <- user_data_from_user_object(data),
1119 data <- maybe_update_follow_information(data) do
1120 {:ok, data}
1121 else
1122 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1123 end
1124 end
1125
1126 def make_user_from_ap_id(ap_id) do
1127 if _user = User.get_cached_by_ap_id(ap_id) do
1128 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1129 else
1130 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1131 User.insert_or_update_user(data)
1132 else
1133 e -> {:error, e}
1134 end
1135 end
1136 end
1137
1138 def make_user_from_nickname(nickname) do
1139 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1140 make_user_from_ap_id(ap_id)
1141 else
1142 _e -> {:error, "No AP id in WebFinger"}
1143 end
1144 end
1145
1146 # filter out broken threads
1147 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1148 entire_thread_visible_for_user?(activity, user)
1149 end
1150
1151 # do post-processing on a specific activity
1152 def contain_activity(%Activity{} = activity, %User{} = user) do
1153 contain_broken_threads(activity, user)
1154 end
1155
1156 def fetch_direct_messages_query do
1157 Activity
1158 |> restrict_type(%{"type" => "Create"})
1159 |> restrict_visibility(%{visibility: "direct"})
1160 |> order_by([activity], asc: activity.id)
1161 end
1162 end