Fix tests
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 else
255 {:quick_insert, true, activity} ->
256 {:ok, activity}
257
258 {:fake, true, activity} ->
259 {:ok, activity}
260
261 {:error, message} ->
262 {:error, message}
263 end
264 end
265
266 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
267 additional = params[:additional] || %{}
268 # only accept false as false value
269 local = !(params[:local] == false)
270 published = params[:published]
271
272 with listen_data <-
273 make_listen_data(
274 %{to: to, actor: actor, published: published, context: context, object: object},
275 additional
276 ),
277 {:ok, activity} <- insert(listen_data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 else
281 {:error, message} ->
282 {:error, message}
283 end
284 end
285
286 def accept(params) do
287 accept_or_reject("Accept", params)
288 end
289
290 def reject(params) do
291 accept_or_reject("Reject", params)
292 end
293
294 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
297
298 with data <-
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Utils.maybe_put("id", activity_id),
301 {:ok, activity} <- insert(data, local),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 end
305 end
306
307 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
308 local = !(params[:local] == false)
309 activity_id = params[:activity_id]
310
311 with data <- %{
312 "to" => to,
313 "cc" => cc,
314 "type" => "Update",
315 "actor" => actor,
316 "object" => object
317 },
318 data <- Utils.maybe_put(data, "id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
326 def like(
327 %User{ap_id: ap_id} = user,
328 %Object{data: %{"id" => _}} = object,
329 activity_id \\ nil,
330 local \\ true
331 ) do
332 with nil <- get_existing_like(ap_id, object),
333 like_data <- make_like_data(user, object, activity_id),
334 {:ok, activity} <- insert(like_data, local),
335 {:ok, object} <- add_like_to_object(activity, object),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity, object}
338 else
339 %Activity{} = activity -> {:ok, activity, object}
340 error -> {:error, error}
341 end
342 end
343
344 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
345 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
346 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
347 {:ok, unlike_activity} <- insert(unlike_data, local),
348 {:ok, _activity} <- Repo.delete(like_activity),
349 {:ok, object} <- remove_like_from_object(like_activity, object),
350 :ok <- maybe_federate(unlike_activity) do
351 {:ok, unlike_activity, like_activity, object}
352 else
353 _e -> {:ok, object}
354 end
355 end
356
357 def announce(
358 %User{ap_id: _} = user,
359 %Object{data: %{"id" => _}} = object,
360 activity_id \\ nil,
361 local \\ true,
362 public \\ true
363 ) do
364 with true <- is_announceable?(object, user, public),
365 announce_data <- make_announce_data(user, object, activity_id, public),
366 {:ok, activity} <- insert(announce_data, local),
367 {:ok, object} <- add_announce_to_object(activity, object),
368 :ok <- maybe_federate(activity) do
369 {:ok, activity, object}
370 else
371 error -> {:error, error}
372 end
373 end
374
375 def unannounce(
376 %User{} = actor,
377 %Object{} = object,
378 activity_id \\ nil,
379 local \\ true
380 ) do
381 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
382 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
383 {:ok, unannounce_activity} <- insert(unannounce_data, local),
384 :ok <- maybe_federate(unannounce_activity),
385 {:ok, _activity} <- Repo.delete(announce_activity),
386 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
387 {:ok, unannounce_activity, object}
388 else
389 _e -> {:ok, object}
390 end
391 end
392
393 def follow(follower, followed, activity_id \\ nil, local \\ true) do
394 with data <- make_follow_data(follower, followed, activity_id),
395 {:ok, activity} <- insert(data, local),
396 :ok <- maybe_federate(activity),
397 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
398 {:ok, activity}
399 end
400 end
401
402 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
403 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
404 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
405 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
406 {:ok, activity} <- insert(unfollow_data, local),
407 :ok <- maybe_federate(activity) do
408 {:ok, activity}
409 end
410 end
411
412 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
413 with data <- %{
414 "to" => [follower_address],
415 "type" => "Delete",
416 "actor" => ap_id,
417 "object" => %{"type" => "Person", "id" => ap_id}
418 },
419 {:ok, activity} <- insert(data, true, true, true),
420 :ok <- maybe_federate(activity) do
421 {:ok, user}
422 end
423 end
424
425 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
426 local = Keyword.get(options, :local, true)
427 activity_id = Keyword.get(options, :activity_id, nil)
428 actor = Keyword.get(options, :actor, actor)
429
430 user = User.get_cached_by_ap_id(actor)
431 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
432
433 with {:ok, object, activity} <- Object.delete(object),
434 data <-
435 %{
436 "type" => "Delete",
437 "actor" => actor,
438 "object" => id,
439 "to" => to,
440 "deleted_activity_id" => activity && activity.id
441 }
442 |> maybe_put("id", activity_id),
443 {:ok, activity} <- insert(data, local, false),
444 stream_out_participations(object, user),
445 _ <- decrease_replies_count_if_reply(object),
446 {:ok, _actor} <- decrease_note_count_if_public(user, object),
447 :ok <- maybe_federate(activity) do
448 {:ok, activity}
449 end
450 end
451
452 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
453 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
454 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
455 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
456
457 if unfollow_blocked do
458 follow_activity = fetch_latest_follow(blocker, blocked)
459 if follow_activity, do: unfollow(blocker, blocked, nil, local)
460 end
461
462 with true <- outgoing_blocks,
463 block_data <- make_block_data(blocker, blocked, activity_id),
464 {:ok, activity} <- insert(block_data, local),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 else
468 _e -> {:ok, nil}
469 end
470 end
471
472 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
473 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
474 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
475 {:ok, activity} <- insert(unblock_data, local),
476 :ok <- maybe_federate(activity) do
477 {:ok, activity}
478 end
479 end
480
481 @spec flag(map()) :: {:ok, Activity.t()} | any
482 def flag(
483 %{
484 actor: actor,
485 context: _context,
486 account: account,
487 statuses: statuses,
488 content: content
489 } = params
490 ) do
491 # only accept false as false value
492 local = !(params[:local] == false)
493 forward = !(params[:forward] == false)
494
495 additional = params[:additional] || %{}
496
497 additional =
498 if forward do
499 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
500 else
501 Map.merge(additional, %{"to" => [], "cc" => []})
502 end
503
504 with flag_data <- make_flag_data(params, additional),
505 {:ok, activity} <- insert(flag_data, local),
506 :ok <- maybe_federate(activity) do
507 Enum.each(User.all_superusers(), fn superuser ->
508 superuser
509 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
510 |> Pleroma.Emails.Mailer.deliver_async()
511 end)
512
513 {:ok, activity}
514 end
515 end
516
517 def move(%User{} = origin, %User{} = target, local \\ true) do
518 params = %{
519 "type" => "Move",
520 "actor" => origin.ap_id,
521 "object" => origin.ap_id,
522 "target" => target.ap_id
523 }
524
525 with true <- origin.ap_id in target.also_known_as,
526 {:ok, activity} <- insert(params, local) do
527 maybe_federate(activity)
528
529 BackgroundWorker.enqueue("move_following", %{
530 "origin_id" => origin.id,
531 "target_id" => target.id
532 })
533
534 {:ok, activity}
535 else
536 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
537 err -> err
538 end
539 end
540
541 defp fetch_activities_for_context_query(context, opts) do
542 public = [Pleroma.Constants.as_public()]
543
544 recipients =
545 if opts["user"],
546 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
547 else: public
548
549 from(activity in Activity)
550 |> maybe_preload_objects(opts)
551 |> maybe_preload_bookmarks(opts)
552 |> maybe_set_thread_muted_field(opts)
553 |> restrict_blocked(opts)
554 |> restrict_recipients(recipients, opts["user"])
555 |> where(
556 [activity],
557 fragment(
558 "?->>'type' = ? and ?->>'context' = ?",
559 activity.data,
560 "Create",
561 activity.data,
562 ^context
563 )
564 )
565 |> exclude_poll_votes(opts)
566 |> exclude_id(opts)
567 |> order_by([activity], desc: activity.id)
568 end
569
570 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
571 def fetch_activities_for_context(context, opts \\ %{}) do
572 context
573 |> fetch_activities_for_context_query(opts)
574 |> Repo.all()
575 end
576
577 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
578 FlakeId.Ecto.CompatType.t() | nil
579 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
580 context
581 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
582 |> limit(1)
583 |> select([a], a.id)
584 |> Repo.one()
585 end
586
587 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
588 opts = Map.drop(opts, ["user"])
589
590 [Pleroma.Constants.as_public()]
591 |> fetch_activities_query(opts)
592 |> restrict_unlisted()
593 |> Pagination.fetch_paginated(opts, pagination)
594 |> Enum.reverse()
595 end
596
597 @valid_visibilities ~w[direct unlisted public private]
598
599 defp restrict_visibility(query, %{visibility: visibility})
600 when is_list(visibility) do
601 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
602 query =
603 from(
604 a in query,
605 where:
606 fragment(
607 "activity_visibility(?, ?, ?) = ANY (?)",
608 a.actor,
609 a.recipients,
610 a.data,
611 ^visibility
612 )
613 )
614
615 query
616 else
617 Logger.error("Could not restrict visibility to #{visibility}")
618 end
619 end
620
621 defp restrict_visibility(query, %{visibility: visibility})
622 when visibility in @valid_visibilities do
623 from(
624 a in query,
625 where:
626 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
627 )
628 end
629
630 defp restrict_visibility(_query, %{visibility: visibility})
631 when visibility not in @valid_visibilities do
632 Logger.error("Could not restrict visibility to #{visibility}")
633 end
634
635 defp restrict_visibility(query, _visibility), do: query
636
637 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
638 when is_list(visibility) do
639 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
640 from(
641 a in query,
642 where:
643 not fragment(
644 "activity_visibility(?, ?, ?) = ANY (?)",
645 a.actor,
646 a.recipients,
647 a.data,
648 ^visibility
649 )
650 )
651 else
652 Logger.error("Could not exclude visibility to #{visibility}")
653 query
654 end
655 end
656
657 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
658 when visibility in @valid_visibilities do
659 from(
660 a in query,
661 where:
662 not fragment(
663 "activity_visibility(?, ?, ?) = ?",
664 a.actor,
665 a.recipients,
666 a.data,
667 ^visibility
668 )
669 )
670 end
671
672 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
673 when visibility not in @valid_visibilities do
674 Logger.error("Could not exclude visibility to #{visibility}")
675 query
676 end
677
678 defp exclude_visibility(query, _visibility), do: query
679
680 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
681 do: query
682
683 defp restrict_thread_visibility(
684 query,
685 %{"user" => %User{skip_thread_containment: true}},
686 _
687 ),
688 do: query
689
690 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
691 from(
692 a in query,
693 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
694 )
695 end
696
697 defp restrict_thread_visibility(query, _, _), do: query
698
699 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
700 params =
701 params
702 |> Map.put("user", reading_user)
703 |> Map.put("actor_id", user.ap_id)
704 |> Map.put("whole_db", true)
705
706 recipients =
707 user_activities_recipients(%{
708 "godmode" => params["godmode"],
709 "reading_user" => reading_user
710 })
711
712 fetch_activities(recipients, params)
713 |> Enum.reverse()
714 end
715
716 def fetch_user_activities(user, reading_user, params \\ %{}) do
717 params =
718 params
719 |> Map.put("type", ["Create", "Announce"])
720 |> Map.put("user", reading_user)
721 |> Map.put("actor_id", user.ap_id)
722 |> Map.put("whole_db", true)
723 |> Map.put("pinned_activity_ids", user.pinned_activities)
724
725 recipients =
726 user_activities_recipients(%{
727 "godmode" => params["godmode"],
728 "reading_user" => reading_user
729 })
730
731 fetch_activities(recipients, params)
732 |> Enum.reverse()
733 end
734
735 defp user_activities_recipients(%{"godmode" => true}) do
736 []
737 end
738
739 defp user_activities_recipients(%{"reading_user" => reading_user}) do
740 if reading_user do
741 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
742 else
743 [Pleroma.Constants.as_public()]
744 end
745 end
746
747 defp restrict_since(query, %{"since_id" => ""}), do: query
748
749 defp restrict_since(query, %{"since_id" => since_id}) do
750 from(activity in query, where: activity.id > ^since_id)
751 end
752
753 defp restrict_since(query, _), do: query
754
755 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
756 raise "Can't use the child object without preloading!"
757 end
758
759 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
760 when is_list(tag_reject) and tag_reject != [] do
761 from(
762 [_activity, object] in query,
763 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
764 )
765 end
766
767 defp restrict_tag_reject(query, _), do: query
768
769 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
770 raise "Can't use the child object without preloading!"
771 end
772
773 defp restrict_tag_all(query, %{"tag_all" => tag_all})
774 when is_list(tag_all) and tag_all != [] do
775 from(
776 [_activity, object] in query,
777 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
778 )
779 end
780
781 defp restrict_tag_all(query, _), do: query
782
783 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
784 raise "Can't use the child object without preloading!"
785 end
786
787 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
788 from(
789 [_activity, object] in query,
790 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
791 )
792 end
793
794 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
795 from(
796 [_activity, object] in query,
797 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
798 )
799 end
800
801 defp restrict_tag(query, _), do: query
802
803 defp restrict_recipients(query, [], _user), do: query
804
805 defp restrict_recipients(query, recipients, nil) do
806 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
807 end
808
809 defp restrict_recipients(query, recipients, user) do
810 from(
811 activity in query,
812 where: fragment("? && ?", ^recipients, activity.recipients),
813 or_where: activity.actor == ^user.ap_id
814 )
815 end
816
817 defp restrict_local(query, %{"local_only" => true}) do
818 from(activity in query, where: activity.local == true)
819 end
820
821 defp restrict_local(query, _), do: query
822
823 defp restrict_actor(query, %{"actor_id" => actor_id}) do
824 from(activity in query, where: activity.actor == ^actor_id)
825 end
826
827 defp restrict_actor(query, _), do: query
828
829 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
830 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
831 end
832
833 defp restrict_type(query, %{"type" => type}) do
834 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
835 end
836
837 defp restrict_type(query, _), do: query
838
839 defp restrict_state(query, %{"state" => state}) do
840 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
841 end
842
843 defp restrict_state(query, _), do: query
844
845 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
846 from(
847 [_activity, object] in query,
848 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
849 )
850 end
851
852 defp restrict_favorited_by(query, _), do: query
853
854 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
855 raise "Can't use the child object without preloading!"
856 end
857
858 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
859 from(
860 [_activity, object] in query,
861 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
862 )
863 end
864
865 defp restrict_media(query, _), do: query
866
867 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
868 from(
869 [_activity, object] in query,
870 where: fragment("?->>'inReplyTo' is null", object.data)
871 )
872 end
873
874 defp restrict_replies(query, _), do: query
875
876 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
877 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
878 end
879
880 defp restrict_reblogs(query, _), do: query
881
882 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
883
884 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
885 mutes = user.mutes
886
887 query =
888 from([activity] in query,
889 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
890 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
891 )
892
893 unless opts["skip_preload"] do
894 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
895 else
896 query
897 end
898 end
899
900 defp restrict_muted(query, _), do: query
901
902 defp restrict_blocked(query, %{"blocking_user" => %User{} = user}) do
903 blocks = user.blocks || []
904 domain_blocks = user.domain_blocks || []
905
906 query =
907 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
908
909 from(
910 [activity, object: o] in query,
911 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
912 where: fragment("not (? && ?)", activity.recipients, ^blocks),
913 where:
914 fragment(
915 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
916 activity.data,
917 activity.data,
918 ^blocks
919 ),
920 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
921 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
922 )
923 end
924
925 defp restrict_blocked(query, _), do: query
926
927 defp restrict_unlisted(query) do
928 from(
929 activity in query,
930 where:
931 fragment(
932 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
933 activity.data,
934 ^[Pleroma.Constants.as_public()]
935 )
936 )
937 end
938
939 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
940 from(activity in query, where: activity.id in ^ids)
941 end
942
943 defp restrict_pinned(query, _), do: query
944
945 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user}) do
946 muted_reblogs = user.muted_reblogs || []
947
948 from(
949 activity in query,
950 where:
951 fragment(
952 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
953 activity.data,
954 activity.actor,
955 ^muted_reblogs
956 )
957 )
958 end
959
960 defp restrict_muted_reblogs(query, _), do: query
961
962 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
963
964 defp exclude_poll_votes(query, _) do
965 if has_named_binding?(query, :object) do
966 from([activity, object: o] in query,
967 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
968 )
969 else
970 query
971 end
972 end
973
974 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
975 from(activity in query, where: activity.id != ^id)
976 end
977
978 defp exclude_id(query, _), do: query
979
980 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
981
982 defp maybe_preload_objects(query, _) do
983 query
984 |> Activity.with_preloaded_object()
985 end
986
987 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
988
989 defp maybe_preload_bookmarks(query, opts) do
990 query
991 |> Activity.with_preloaded_bookmark(opts["user"])
992 end
993
994 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
995
996 defp maybe_set_thread_muted_field(query, opts) do
997 query
998 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
999 end
1000
1001 defp maybe_order(query, %{order: :desc}) do
1002 query
1003 |> order_by(desc: :id)
1004 end
1005
1006 defp maybe_order(query, %{order: :asc}) do
1007 query
1008 |> order_by(asc: :id)
1009 end
1010
1011 defp maybe_order(query, _), do: query
1012
1013 def fetch_activities_query(recipients, opts \\ %{}) do
1014 config = %{
1015 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1016 }
1017
1018 Activity
1019 |> maybe_preload_objects(opts)
1020 |> maybe_preload_bookmarks(opts)
1021 |> maybe_set_thread_muted_field(opts)
1022 |> maybe_order(opts)
1023 |> restrict_recipients(recipients, opts["user"])
1024 |> restrict_tag(opts)
1025 |> restrict_tag_reject(opts)
1026 |> restrict_tag_all(opts)
1027 |> restrict_since(opts)
1028 |> restrict_local(opts)
1029 |> restrict_actor(opts)
1030 |> restrict_type(opts)
1031 |> restrict_state(opts)
1032 |> restrict_favorited_by(opts)
1033 |> restrict_blocked(opts)
1034 |> restrict_muted(opts)
1035 |> restrict_media(opts)
1036 |> restrict_visibility(opts)
1037 |> restrict_thread_visibility(opts, config)
1038 |> restrict_replies(opts)
1039 |> restrict_reblogs(opts)
1040 |> restrict_pinned(opts)
1041 |> restrict_muted_reblogs(opts)
1042 |> Activity.restrict_deactivated_users()
1043 |> exclude_poll_votes(opts)
1044 |> exclude_visibility(opts)
1045 end
1046
1047 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1048 list_memberships = Pleroma.List.memberships(opts["user"])
1049
1050 fetch_activities_query(recipients ++ list_memberships, opts)
1051 |> Pagination.fetch_paginated(opts, pagination)
1052 |> Enum.reverse()
1053 |> maybe_update_cc(list_memberships, opts["user"])
1054 end
1055
1056 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1057 when is_list(list_memberships) and length(list_memberships) > 0 do
1058 Enum.map(activities, fn
1059 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1060 if Enum.any?(bcc, &(&1 in list_memberships)) do
1061 update_in(activity.data["cc"], &[user_ap_id | &1])
1062 else
1063 activity
1064 end
1065
1066 activity ->
1067 activity
1068 end)
1069 end
1070
1071 defp maybe_update_cc(activities, _, _), do: activities
1072
1073 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1074 from(activity in query,
1075 where:
1076 fragment("? && ?", activity.recipients, ^recipients) or
1077 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1078 ^Pleroma.Constants.as_public() in activity.recipients)
1079 )
1080 end
1081
1082 def fetch_activities_bounded(
1083 recipients,
1084 recipients_with_public,
1085 opts \\ %{},
1086 pagination \\ :keyset
1087 ) do
1088 fetch_activities_query([], opts)
1089 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1090 |> Pagination.fetch_paginated(opts, pagination)
1091 |> Enum.reverse()
1092 end
1093
1094 def upload(file, opts \\ []) do
1095 with {:ok, data} <- Upload.store(file, opts) do
1096 obj_data =
1097 if opts[:actor] do
1098 Map.put(data, "actor", opts[:actor])
1099 else
1100 data
1101 end
1102
1103 Repo.insert(%Object{data: obj_data})
1104 end
1105 end
1106
1107 defp object_to_user_data(data) do
1108 avatar =
1109 data["icon"]["url"] &&
1110 %{
1111 "type" => "Image",
1112 "url" => [%{"href" => data["icon"]["url"]}]
1113 }
1114
1115 banner =
1116 data["image"]["url"] &&
1117 %{
1118 "type" => "Image",
1119 "url" => [%{"href" => data["image"]["url"]}]
1120 }
1121
1122 fields =
1123 data
1124 |> Map.get("attachment", [])
1125 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1126 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1127
1128 locked = data["manuallyApprovesFollowers"] || false
1129 data = Transmogrifier.maybe_fix_user_object(data)
1130 discoverable = data["discoverable"] || false
1131 invisible = data["invisible"] || false
1132
1133 user_data = %{
1134 ap_id: data["id"],
1135 ap_enabled: true,
1136 source_data: data,
1137 banner: banner,
1138 fields: fields,
1139 locked: locked,
1140 discoverable: discoverable,
1141 invisible: invisible,
1142 avatar: avatar,
1143 name: data["name"],
1144 follower_address: data["followers"],
1145 following_address: data["following"],
1146 bio: data["summary"],
1147 also_known_as: Map.get(data, "alsoKnownAs", [])
1148 }
1149
1150 # nickname can be nil because of virtual actors
1151 user_data =
1152 if data["preferredUsername"] do
1153 Map.put(
1154 user_data,
1155 :nickname,
1156 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1157 )
1158 else
1159 Map.put(user_data, :nickname, nil)
1160 end
1161
1162 {:ok, user_data}
1163 end
1164
1165 def fetch_follow_information_for_user(user) do
1166 with {:ok, following_data} <-
1167 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1168 following_count when is_integer(following_count) <- following_data["totalItems"],
1169 {:ok, hide_follows} <- collection_private(following_data),
1170 {:ok, followers_data} <-
1171 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1172 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1173 {:ok, hide_followers} <- collection_private(followers_data) do
1174 {:ok,
1175 %{
1176 hide_follows: hide_follows,
1177 follower_count: followers_count,
1178 following_count: following_count,
1179 hide_followers: hide_followers
1180 }}
1181 else
1182 {:error, _} = e ->
1183 e
1184
1185 e ->
1186 {:error, e}
1187 end
1188 end
1189
1190 defp maybe_update_follow_information(data) do
1191 with {:enabled, true} <-
1192 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1193 {:ok, info} <- fetch_follow_information_for_user(data) do
1194 info = Map.merge(data[:info] || %{}, info)
1195 Map.put(data, :info, info)
1196 else
1197 {:enabled, false} ->
1198 data
1199
1200 e ->
1201 Logger.error(
1202 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1203 )
1204
1205 data
1206 end
1207 end
1208
1209 defp collection_private(data) do
1210 if is_map(data["first"]) and
1211 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1212 {:ok, false}
1213 else
1214 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1215 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1216 {:ok, false}
1217 else
1218 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1219 {:ok, true}
1220
1221 {:error, _} = e ->
1222 e
1223
1224 e ->
1225 {:error, e}
1226 end
1227 end
1228 end
1229
1230 def user_data_from_user_object(data) do
1231 with {:ok, data} <- MRF.filter(data),
1232 {:ok, data} <- object_to_user_data(data) do
1233 {:ok, data}
1234 else
1235 e -> {:error, e}
1236 end
1237 end
1238
1239 def fetch_and_prepare_user_from_ap_id(ap_id) do
1240 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1241 {:ok, data} <- user_data_from_user_object(data),
1242 data <- maybe_update_follow_information(data) do
1243 {:ok, data}
1244 else
1245 e ->
1246 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1247 {:error, e}
1248 end
1249 end
1250
1251 def make_user_from_ap_id(ap_id) do
1252 if _user = User.get_cached_by_ap_id(ap_id) do
1253 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1254 else
1255 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1256 User.insert_or_update_user(data)
1257 else
1258 e -> {:error, e}
1259 end
1260 end
1261 end
1262
1263 def make_user_from_nickname(nickname) do
1264 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1265 make_user_from_ap_id(ap_id)
1266 else
1267 _e -> {:error, "No AP id in WebFinger"}
1268 end
1269 end
1270
1271 # filter out broken threads
1272 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1273 entire_thread_visible_for_user?(activity, user)
1274 end
1275
1276 # do post-processing on a specific activity
1277 def contain_activity(%Activity{} = activity, %User{} = user) do
1278 contain_broken_threads(activity, user)
1279 end
1280
1281 def fetch_direct_messages_query do
1282 Activity
1283 |> restrict_type(%{"type" => "Create"})
1284 |> restrict_visibility(%{visibility: "direct"})
1285 |> order_by([activity], asc: activity.id)
1286 end
1287 end