8b5bd83e27a35625cf2f320310fe2d42c8ecefee
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 else
255 {:quick_insert, true, activity} ->
256 {:ok, activity}
257
258 {:fake, true, activity} ->
259 {:ok, activity}
260
261 {:error, message} ->
262 {:error, message}
263 end
264 end
265
266 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
267 additional = params[:additional] || %{}
268 # only accept false as false value
269 local = !(params[:local] == false)
270 published = params[:published]
271
272 with listen_data <-
273 make_listen_data(
274 %{to: to, actor: actor, published: published, context: context, object: object},
275 additional
276 ),
277 {:ok, activity} <- insert(listen_data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 else
281 {:error, message} ->
282 {:error, message}
283 end
284 end
285
286 def accept(params) do
287 accept_or_reject("Accept", params)
288 end
289
290 def reject(params) do
291 accept_or_reject("Reject", params)
292 end
293
294 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
297
298 with data <-
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Utils.maybe_put("id", activity_id),
301 {:ok, activity} <- insert(data, local),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 end
305 end
306
307 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
308 local = !(params[:local] == false)
309 activity_id = params[:activity_id]
310
311 with data <- %{
312 "to" => to,
313 "cc" => cc,
314 "type" => "Update",
315 "actor" => actor,
316 "object" => object
317 },
318 data <- Utils.maybe_put(data, "id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
326 def like(
327 %User{ap_id: ap_id} = user,
328 %Object{data: %{"id" => _}} = object,
329 activity_id \\ nil,
330 local \\ true
331 ) do
332 with nil <- get_existing_like(ap_id, object),
333 like_data <- make_like_data(user, object, activity_id),
334 {:ok, activity} <- insert(like_data, local),
335 {:ok, object} <- add_like_to_object(activity, object),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity, object}
338 else
339 %Activity{} = activity -> {:ok, activity, object}
340 error -> {:error, error}
341 end
342 end
343
344 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
345 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
346 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
347 {:ok, unlike_activity} <- insert(unlike_data, local),
348 {:ok, _activity} <- Repo.delete(like_activity),
349 {:ok, object} <- remove_like_from_object(like_activity, object),
350 :ok <- maybe_federate(unlike_activity) do
351 {:ok, unlike_activity, like_activity, object}
352 else
353 _e -> {:ok, object}
354 end
355 end
356
357 def announce(
358 %User{ap_id: _} = user,
359 %Object{data: %{"id" => _}} = object,
360 activity_id \\ nil,
361 local \\ true,
362 public \\ true
363 ) do
364 with true <- is_announceable?(object, user, public),
365 announce_data <- make_announce_data(user, object, activity_id, public),
366 {:ok, activity} <- insert(announce_data, local),
367 {:ok, object} <- add_announce_to_object(activity, object),
368 :ok <- maybe_federate(activity) do
369 {:ok, activity, object}
370 else
371 error -> {:error, error}
372 end
373 end
374
375 def unannounce(
376 %User{} = actor,
377 %Object{} = object,
378 activity_id \\ nil,
379 local \\ true
380 ) do
381 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
382 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
383 {:ok, unannounce_activity} <- insert(unannounce_data, local),
384 :ok <- maybe_federate(unannounce_activity),
385 {:ok, _activity} <- Repo.delete(announce_activity),
386 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
387 {:ok, unannounce_activity, object}
388 else
389 _e -> {:ok, object}
390 end
391 end
392
393 def follow(follower, followed, activity_id \\ nil, local \\ true) do
394 with data <- make_follow_data(follower, followed, activity_id),
395 {:ok, activity} <- insert(data, local),
396 :ok <- maybe_federate(activity),
397 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
398 {:ok, activity}
399 end
400 end
401
402 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
403 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
404 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
405 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
406 {:ok, activity} <- insert(unfollow_data, local),
407 :ok <- maybe_federate(activity) do
408 {:ok, activity}
409 end
410 end
411
412 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
413 with data <- %{
414 "to" => [follower_address],
415 "type" => "Delete",
416 "actor" => ap_id,
417 "object" => %{"type" => "Person", "id" => ap_id}
418 },
419 {:ok, activity} <- insert(data, true, true, true),
420 :ok <- maybe_federate(activity) do
421 {:ok, user}
422 end
423 end
424
425 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
426 local = Keyword.get(options, :local, true)
427 activity_id = Keyword.get(options, :activity_id, nil)
428 actor = Keyword.get(options, :actor, actor)
429
430 user = User.get_cached_by_ap_id(actor)
431 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
432
433 with {:ok, object, activity} <- Object.delete(object),
434 data <-
435 %{
436 "type" => "Delete",
437 "actor" => actor,
438 "object" => id,
439 "to" => to,
440 "deleted_activity_id" => activity && activity.id
441 }
442 |> maybe_put("id", activity_id),
443 {:ok, activity} <- insert(data, local, false),
444 stream_out_participations(object, user),
445 _ <- decrease_replies_count_if_reply(object),
446 {:ok, _actor} <- decrease_note_count_if_public(user, object),
447 :ok <- maybe_federate(activity) do
448 {:ok, activity}
449 end
450 end
451
452 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
453 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
454 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
455 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
456
457 if unfollow_blocked do
458 follow_activity = fetch_latest_follow(blocker, blocked)
459 if follow_activity, do: unfollow(blocker, blocked, nil, local)
460 end
461
462 with true <- outgoing_blocks,
463 block_data <- make_block_data(blocker, blocked, activity_id),
464 {:ok, activity} <- insert(block_data, local),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 else
468 _e -> {:ok, nil}
469 end
470 end
471
472 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
473 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
474 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
475 {:ok, activity} <- insert(unblock_data, local),
476 :ok <- maybe_federate(activity) do
477 {:ok, activity}
478 end
479 end
480
481 @spec flag(map()) :: {:ok, Activity.t()} | any
482 def flag(
483 %{
484 actor: actor,
485 context: _context,
486 account: account,
487 statuses: statuses,
488 content: content
489 } = params
490 ) do
491 # only accept false as false value
492 local = !(params[:local] == false)
493 forward = !(params[:forward] == false)
494
495 additional = params[:additional] || %{}
496
497 additional =
498 if forward do
499 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
500 else
501 Map.merge(additional, %{"to" => [], "cc" => []})
502 end
503
504 with flag_data <- make_flag_data(params, additional),
505 {:ok, activity} <- insert(flag_data, local),
506 {:ok, stripped_activity} <- strip_report_status_data(activity),
507 :ok <- maybe_federate(stripped_activity) do
508 Enum.each(User.all_superusers(), fn superuser ->
509 superuser
510 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
511 |> Pleroma.Emails.Mailer.deliver_async()
512 end)
513
514 {:ok, activity}
515 end
516 end
517
518 defp fetch_activities_for_context_query(context, opts) do
519 public = [Pleroma.Constants.as_public()]
520
521 recipients =
522 if opts["user"],
523 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
524 else: public
525
526 from(activity in Activity)
527 |> maybe_preload_objects(opts)
528 |> maybe_preload_bookmarks(opts)
529 |> maybe_set_thread_muted_field(opts)
530 |> restrict_blocked(opts)
531 |> restrict_recipients(recipients, opts["user"])
532 |> where(
533 [activity],
534 fragment(
535 "?->>'type' = ? and ?->>'context' = ?",
536 activity.data,
537 "Create",
538 activity.data,
539 ^context
540 )
541 )
542 |> exclude_poll_votes(opts)
543 |> exclude_id(opts)
544 |> order_by([activity], desc: activity.id)
545 end
546
547 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
548 def fetch_activities_for_context(context, opts \\ %{}) do
549 context
550 |> fetch_activities_for_context_query(opts)
551 |> Repo.all()
552 end
553
554 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
555 FlakeId.Ecto.CompatType.t() | nil
556 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
557 context
558 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
559 |> limit(1)
560 |> select([a], a.id)
561 |> Repo.one()
562 end
563
564 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
565 opts = Map.drop(opts, ["user"])
566
567 [Pleroma.Constants.as_public()]
568 |> fetch_activities_query(opts)
569 |> restrict_unlisted()
570 |> Pagination.fetch_paginated(opts, pagination)
571 end
572
573 @valid_visibilities ~w[direct unlisted public private]
574
575 defp restrict_visibility(query, %{visibility: visibility})
576 when is_list(visibility) do
577 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
578 query =
579 from(
580 a in query,
581 where:
582 fragment(
583 "activity_visibility(?, ?, ?) = ANY (?)",
584 a.actor,
585 a.recipients,
586 a.data,
587 ^visibility
588 )
589 )
590
591 query
592 else
593 Logger.error("Could not restrict visibility to #{visibility}")
594 end
595 end
596
597 defp restrict_visibility(query, %{visibility: visibility})
598 when visibility in @valid_visibilities do
599 from(
600 a in query,
601 where:
602 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
603 )
604 end
605
606 defp restrict_visibility(_query, %{visibility: visibility})
607 when visibility not in @valid_visibilities do
608 Logger.error("Could not restrict visibility to #{visibility}")
609 end
610
611 defp restrict_visibility(query, _visibility), do: query
612
613 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
614 when is_list(visibility) do
615 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
616 from(
617 a in query,
618 where:
619 not fragment(
620 "activity_visibility(?, ?, ?) = ANY (?)",
621 a.actor,
622 a.recipients,
623 a.data,
624 ^visibility
625 )
626 )
627 else
628 Logger.error("Could not exclude visibility to #{visibility}")
629 query
630 end
631 end
632
633 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
634 when visibility in @valid_visibilities do
635 from(
636 a in query,
637 where:
638 not fragment(
639 "activity_visibility(?, ?, ?) = ?",
640 a.actor,
641 a.recipients,
642 a.data,
643 ^visibility
644 )
645 )
646 end
647
648 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
649 when visibility not in @valid_visibilities do
650 Logger.error("Could not exclude visibility to #{visibility}")
651 query
652 end
653
654 defp exclude_visibility(query, _visibility), do: query
655
656 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
657 do: query
658
659 defp restrict_thread_visibility(
660 query,
661 %{"user" => %User{skip_thread_containment: true}},
662 _
663 ),
664 do: query
665
666 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
667 from(
668 a in query,
669 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
670 )
671 end
672
673 defp restrict_thread_visibility(query, _, _), do: query
674
675 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
676 params =
677 params
678 |> Map.put("user", reading_user)
679 |> Map.put("actor_id", user.ap_id)
680 |> Map.put("whole_db", true)
681
682 recipients =
683 user_activities_recipients(%{
684 "godmode" => params["godmode"],
685 "reading_user" => reading_user
686 })
687
688 fetch_activities(recipients, params)
689 |> Enum.reverse()
690 end
691
692 def fetch_user_activities(user, reading_user, params \\ %{}) do
693 params =
694 params
695 |> Map.put("type", ["Create", "Announce"])
696 |> Map.put("user", reading_user)
697 |> Map.put("actor_id", user.ap_id)
698 |> Map.put("whole_db", true)
699 |> Map.put("pinned_activity_ids", user.pinned_activities)
700
701 recipients =
702 user_activities_recipients(%{
703 "godmode" => params["godmode"],
704 "reading_user" => reading_user
705 })
706
707 fetch_activities(recipients, params)
708 |> Enum.reverse()
709 end
710
711 def fetch_instance_activities(params) do
712 params =
713 params
714 |> Map.put("type", ["Create", "Announce"])
715 |> Map.put("instance", params["instance"])
716 |> Map.put("whole_db", true)
717
718 fetch_activities([Pleroma.Constants.as_public()], params, :offset)
719 |> Enum.reverse()
720 end
721
722 defp user_activities_recipients(%{"godmode" => true}) do
723 []
724 end
725
726 defp user_activities_recipients(%{"reading_user" => reading_user}) do
727 if reading_user do
728 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
729 else
730 [Pleroma.Constants.as_public()]
731 end
732 end
733
734 defp restrict_since(query, %{"since_id" => ""}), do: query
735
736 defp restrict_since(query, %{"since_id" => since_id}) do
737 from(activity in query, where: activity.id > ^since_id)
738 end
739
740 defp restrict_since(query, _), do: query
741
742 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
743 raise "Can't use the child object without preloading!"
744 end
745
746 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
747 when is_list(tag_reject) and tag_reject != [] do
748 from(
749 [_activity, object] in query,
750 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
751 )
752 end
753
754 defp restrict_tag_reject(query, _), do: query
755
756 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
757 raise "Can't use the child object without preloading!"
758 end
759
760 defp restrict_tag_all(query, %{"tag_all" => tag_all})
761 when is_list(tag_all) and tag_all != [] do
762 from(
763 [_activity, object] in query,
764 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
765 )
766 end
767
768 defp restrict_tag_all(query, _), do: query
769
770 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
771 raise "Can't use the child object without preloading!"
772 end
773
774 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
775 from(
776 [_activity, object] in query,
777 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
778 )
779 end
780
781 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
782 from(
783 [_activity, object] in query,
784 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
785 )
786 end
787
788 defp restrict_tag(query, _), do: query
789
790 defp restrict_recipients(query, [], _user), do: query
791
792 defp restrict_recipients(query, recipients, nil) do
793 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
794 end
795
796 defp restrict_recipients(query, recipients, user) do
797 from(
798 activity in query,
799 where: fragment("? && ?", ^recipients, activity.recipients),
800 or_where: activity.actor == ^user.ap_id
801 )
802 end
803
804 defp restrict_local(query, %{"local_only" => true}) do
805 from(activity in query, where: activity.local == true)
806 end
807
808 defp restrict_local(query, _), do: query
809
810 defp restrict_actor(query, %{"actor_id" => actor_id}) do
811 from(activity in query, where: activity.actor == ^actor_id)
812 end
813
814 defp restrict_actor(query, _), do: query
815
816 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
817 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
818 end
819
820 defp restrict_type(query, %{"type" => type}) do
821 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
822 end
823
824 defp restrict_type(query, _), do: query
825
826 defp restrict_state(query, %{"state" => state}) do
827 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
828 end
829
830 defp restrict_state(query, _), do: query
831
832 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
833 from(
834 [_activity, object] in query,
835 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
836 )
837 end
838
839 defp restrict_favorited_by(query, _), do: query
840
841 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
842 raise "Can't use the child object without preloading!"
843 end
844
845 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
846 from(
847 [_activity, object] in query,
848 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
849 )
850 end
851
852 defp restrict_media(query, _), do: query
853
854 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
855 from(
856 [_activity, object] in query,
857 where: fragment("?->>'inReplyTo' is null", object.data)
858 )
859 end
860
861 defp restrict_replies(query, _), do: query
862
863 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
864 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
865 end
866
867 defp restrict_reblogs(query, _), do: query
868
869 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
870
871 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
872 mutes = user.mutes
873
874 query =
875 from([activity] in query,
876 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
877 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
878 )
879
880 unless opts["skip_preload"] do
881 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
882 else
883 query
884 end
885 end
886
887 defp restrict_muted(query, _), do: query
888
889 defp restrict_blocked(query, %{"blocking_user" => %User{} = user}) do
890 blocks = user.blocks || []
891 domain_blocks = user.domain_blocks || []
892
893 query =
894 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
895
896 from(
897 [activity, object: o] in query,
898 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
899 where: fragment("not (? && ?)", activity.recipients, ^blocks),
900 where:
901 fragment(
902 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
903 activity.data,
904 activity.data,
905 ^blocks
906 ),
907 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
908 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
909 )
910 end
911
912 defp restrict_blocked(query, _), do: query
913
914 defp restrict_unlisted(query) do
915 from(
916 activity in query,
917 where:
918 fragment(
919 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
920 activity.data,
921 ^[Pleroma.Constants.as_public()]
922 )
923 )
924 end
925
926 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
927 from(activity in query, where: activity.id in ^ids)
928 end
929
930 defp restrict_pinned(query, _), do: query
931
932 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user}) do
933 muted_reblogs = user.muted_reblogs || []
934
935 from(
936 activity in query,
937 where:
938 fragment(
939 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
940 activity.data,
941 activity.actor,
942 ^muted_reblogs
943 )
944 )
945 end
946
947 defp restrict_muted_reblogs(query, _), do: query
948
949 defp restrict_instance(query, %{"instance" => instance}) do
950 users =
951 from(
952 u in User,
953 select: u.ap_id,
954 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
955 )
956 |> Repo.all()
957
958 from(activity in query, where: activity.actor in ^users)
959 end
960
961 defp restrict_instance(query, _), do: query
962
963 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
964
965 defp exclude_poll_votes(query, _) do
966 if has_named_binding?(query, :object) do
967 from([activity, object: o] in query,
968 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
969 )
970 else
971 query
972 end
973 end
974
975 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
976 from(activity in query, where: activity.id != ^id)
977 end
978
979 defp exclude_id(query, _), do: query
980
981 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
982
983 defp maybe_preload_objects(query, _) do
984 query
985 |> Activity.with_preloaded_object()
986 end
987
988 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
989
990 defp maybe_preload_bookmarks(query, opts) do
991 query
992 |> Activity.with_preloaded_bookmark(opts["user"])
993 end
994
995 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
996
997 defp maybe_set_thread_muted_field(query, opts) do
998 query
999 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1000 end
1001
1002 defp maybe_order(query, %{order: :desc}) do
1003 query
1004 |> order_by(desc: :id)
1005 end
1006
1007 defp maybe_order(query, %{order: :asc}) do
1008 query
1009 |> order_by(asc: :id)
1010 end
1011
1012 defp maybe_order(query, _), do: query
1013
1014 def fetch_activities_query(recipients, opts \\ %{}) do
1015 config = %{
1016 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1017 }
1018
1019 Activity
1020 |> maybe_preload_objects(opts)
1021 |> maybe_preload_bookmarks(opts)
1022 |> maybe_set_thread_muted_field(opts)
1023 |> maybe_order(opts)
1024 |> restrict_recipients(recipients, opts["user"])
1025 |> restrict_tag(opts)
1026 |> restrict_tag_reject(opts)
1027 |> restrict_tag_all(opts)
1028 |> restrict_since(opts)
1029 |> restrict_local(opts)
1030 |> restrict_actor(opts)
1031 |> restrict_type(opts)
1032 |> restrict_state(opts)
1033 |> restrict_favorited_by(opts)
1034 |> restrict_blocked(opts)
1035 |> restrict_muted(opts)
1036 |> restrict_media(opts)
1037 |> restrict_visibility(opts)
1038 |> restrict_thread_visibility(opts, config)
1039 |> restrict_replies(opts)
1040 |> restrict_reblogs(opts)
1041 |> restrict_pinned(opts)
1042 |> restrict_muted_reblogs(opts)
1043 |> restrict_instance(opts)
1044 |> Activity.restrict_deactivated_users()
1045 |> exclude_poll_votes(opts)
1046 |> exclude_visibility(opts)
1047 end
1048
1049 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1050 list_memberships = Pleroma.List.memberships(opts["user"])
1051
1052 fetch_activities_query(recipients ++ list_memberships, opts)
1053 |> Pagination.fetch_paginated(opts, pagination)
1054 |> Enum.reverse()
1055 |> maybe_update_cc(list_memberships, opts["user"])
1056 end
1057
1058 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1059 when is_list(list_memberships) and length(list_memberships) > 0 do
1060 Enum.map(activities, fn
1061 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1062 if Enum.any?(bcc, &(&1 in list_memberships)) do
1063 update_in(activity.data["cc"], &[user_ap_id | &1])
1064 else
1065 activity
1066 end
1067
1068 activity ->
1069 activity
1070 end)
1071 end
1072
1073 defp maybe_update_cc(activities, _, _), do: activities
1074
1075 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1076 from(activity in query,
1077 where:
1078 fragment("? && ?", activity.recipients, ^recipients) or
1079 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1080 ^Pleroma.Constants.as_public() in activity.recipients)
1081 )
1082 end
1083
1084 def fetch_activities_bounded(
1085 recipients,
1086 recipients_with_public,
1087 opts \\ %{},
1088 pagination \\ :keyset
1089 ) do
1090 fetch_activities_query([], opts)
1091 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1092 |> Pagination.fetch_paginated(opts, pagination)
1093 |> Enum.reverse()
1094 end
1095
1096 def upload(file, opts \\ []) do
1097 with {:ok, data} <- Upload.store(file, opts) do
1098 obj_data =
1099 if opts[:actor] do
1100 Map.put(data, "actor", opts[:actor])
1101 else
1102 data
1103 end
1104
1105 Repo.insert(%Object{data: obj_data})
1106 end
1107 end
1108
1109 defp object_to_user_data(data) do
1110 avatar =
1111 data["icon"]["url"] &&
1112 %{
1113 "type" => "Image",
1114 "url" => [%{"href" => data["icon"]["url"]}]
1115 }
1116
1117 banner =
1118 data["image"]["url"] &&
1119 %{
1120 "type" => "Image",
1121 "url" => [%{"href" => data["image"]["url"]}]
1122 }
1123
1124 fields =
1125 data
1126 |> Map.get("attachment", [])
1127 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1128 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1129
1130 locked = data["manuallyApprovesFollowers"] || false
1131 data = Transmogrifier.maybe_fix_user_object(data)
1132 discoverable = data["discoverable"] || false
1133 invisible = data["invisible"] || false
1134
1135 user_data = %{
1136 ap_id: data["id"],
1137 ap_enabled: true,
1138 source_data: data,
1139 banner: banner,
1140 fields: fields,
1141 locked: locked,
1142 discoverable: discoverable,
1143 invisible: invisible,
1144 avatar: avatar,
1145 name: data["name"],
1146 follower_address: data["followers"],
1147 following_address: data["following"],
1148 bio: data["summary"]
1149 }
1150
1151 # nickname can be nil because of virtual actors
1152 user_data =
1153 if data["preferredUsername"] do
1154 Map.put(
1155 user_data,
1156 :nickname,
1157 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1158 )
1159 else
1160 Map.put(user_data, :nickname, nil)
1161 end
1162
1163 {:ok, user_data}
1164 end
1165
1166 def fetch_follow_information_for_user(user) do
1167 with {:ok, following_data} <-
1168 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1169 following_count when is_integer(following_count) <- following_data["totalItems"],
1170 {:ok, hide_follows} <- collection_private(following_data),
1171 {:ok, followers_data} <-
1172 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1173 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1174 {:ok, hide_followers} <- collection_private(followers_data) do
1175 {:ok,
1176 %{
1177 hide_follows: hide_follows,
1178 follower_count: followers_count,
1179 following_count: following_count,
1180 hide_followers: hide_followers
1181 }}
1182 else
1183 {:error, _} = e ->
1184 e
1185
1186 e ->
1187 {:error, e}
1188 end
1189 end
1190
1191 defp maybe_update_follow_information(data) do
1192 with {:enabled, true} <-
1193 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1194 {:ok, info} <- fetch_follow_information_for_user(data) do
1195 info = Map.merge(data[:info] || %{}, info)
1196 Map.put(data, :info, info)
1197 else
1198 {:enabled, false} ->
1199 data
1200
1201 e ->
1202 Logger.error(
1203 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1204 )
1205
1206 data
1207 end
1208 end
1209
1210 defp collection_private(data) do
1211 if is_map(data["first"]) and
1212 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1213 {:ok, false}
1214 else
1215 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1216 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1217 {:ok, false}
1218 else
1219 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1220 {:ok, true}
1221
1222 {:error, _} = e ->
1223 e
1224
1225 e ->
1226 {:error, e}
1227 end
1228 end
1229 end
1230
1231 def user_data_from_user_object(data) do
1232 with {:ok, data} <- MRF.filter(data),
1233 {:ok, data} <- object_to_user_data(data) do
1234 {:ok, data}
1235 else
1236 e -> {:error, e}
1237 end
1238 end
1239
1240 def fetch_and_prepare_user_from_ap_id(ap_id) do
1241 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1242 {:ok, data} <- user_data_from_user_object(data),
1243 data <- maybe_update_follow_information(data) do
1244 {:ok, data}
1245 else
1246 e ->
1247 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1248 {:error, e}
1249 end
1250 end
1251
1252 def make_user_from_ap_id(ap_id) do
1253 if _user = User.get_cached_by_ap_id(ap_id) do
1254 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1255 else
1256 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1257 User.insert_or_update_user(data)
1258 else
1259 e -> {:error, e}
1260 end
1261 end
1262 end
1263
1264 def make_user_from_nickname(nickname) do
1265 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1266 make_user_from_ap_id(ap_id)
1267 else
1268 _e -> {:error, "No AP id in WebFinger"}
1269 end
1270 end
1271
1272 # filter out broken threads
1273 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1274 entire_thread_visible_for_user?(activity, user)
1275 end
1276
1277 # do post-processing on a specific activity
1278 def contain_activity(%Activity{} = activity, %User{} = user) do
1279 contain_broken_threads(activity, user)
1280 end
1281
1282 def fetch_direct_messages_query do
1283 Activity
1284 |> restrict_type(%{"type" => "Create"})
1285 |> restrict_visibility(%{visibility: "direct"})
1286 |> order_by([activity], asc: activity.id)
1287 end
1288 end