ActivityPub: For user timelines, respects blocks.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 else
255 {:quick_insert, true, activity} ->
256 {:ok, activity}
257
258 {:fake, true, activity} ->
259 {:ok, activity}
260
261 {:error, message} ->
262 {:error, message}
263 end
264 end
265
266 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
267 additional = params[:additional] || %{}
268 # only accept false as false value
269 local = !(params[:local] == false)
270 published = params[:published]
271
272 with listen_data <-
273 make_listen_data(
274 %{to: to, actor: actor, published: published, context: context, object: object},
275 additional
276 ),
277 {:ok, activity} <- insert(listen_data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 else
281 {:error, message} ->
282 {:error, message}
283 end
284 end
285
286 def accept(params) do
287 accept_or_reject("Accept", params)
288 end
289
290 def reject(params) do
291 accept_or_reject("Reject", params)
292 end
293
294 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
297
298 with data <-
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Utils.maybe_put("id", activity_id),
301 {:ok, activity} <- insert(data, local),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 end
305 end
306
307 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
308 local = !(params[:local] == false)
309 activity_id = params[:activity_id]
310
311 with data <- %{
312 "to" => to,
313 "cc" => cc,
314 "type" => "Update",
315 "actor" => actor,
316 "object" => object
317 },
318 data <- Utils.maybe_put(data, "id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 def react_with_emoji(user, object, emoji, options \\ []) do
326 with local <- Keyword.get(options, :local, true),
327 activity_id <- Keyword.get(options, :activity_id, nil),
328 Pleroma.Emoji.is_unicode_emoji?(emoji),
329 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
330 {:ok, activity} <- insert(reaction_data, local),
331 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
332 :ok <- maybe_federate(activity) do
333 {:ok, activity, object}
334 end
335 end
336
337 def unreact_with_emoji(user, reaction_id, options \\ []) do
338 with local <- Keyword.get(options, :local, true),
339 activity_id <- Keyword.get(options, :activity_id, nil),
340 user_ap_id <- user.ap_id,
341 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
342 object <- Object.normalize(reaction_activity),
343 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
344 {:ok, activity} <- insert(unreact_data, local),
345 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
346 :ok <- maybe_federate(activity) do
347 {:ok, activity, object}
348 end
349 end
350
351 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
352 def like(
353 %User{ap_id: ap_id} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true
357 ) do
358 with nil <- get_existing_like(ap_id, object),
359 like_data <- make_like_data(user, object, activity_id),
360 {:ok, activity} <- insert(like_data, local),
361 {:ok, object} <- add_like_to_object(activity, object),
362 :ok <- maybe_federate(activity) do
363 {:ok, activity, object}
364 else
365 %Activity{} = activity -> {:ok, activity, object}
366 error -> {:error, error}
367 end
368 end
369
370 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
371 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
372 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
373 {:ok, unlike_activity} <- insert(unlike_data, local),
374 {:ok, _activity} <- Repo.delete(like_activity),
375 {:ok, object} <- remove_like_from_object(like_activity, object),
376 :ok <- maybe_federate(unlike_activity) do
377 {:ok, unlike_activity, like_activity, object}
378 else
379 _e -> {:ok, object}
380 end
381 end
382
383 def announce(
384 %User{ap_id: _} = user,
385 %Object{data: %{"id" => _}} = object,
386 activity_id \\ nil,
387 local \\ true,
388 public \\ true
389 ) do
390 with true <- is_announceable?(object, user, public),
391 announce_data <- make_announce_data(user, object, activity_id, public),
392 {:ok, activity} <- insert(announce_data, local),
393 {:ok, object} <- add_announce_to_object(activity, object),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity, object}
396 else
397 error -> {:error, error}
398 end
399 end
400
401 def unannounce(
402 %User{} = actor,
403 %Object{} = object,
404 activity_id \\ nil,
405 local \\ true
406 ) do
407 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
408 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
409 {:ok, unannounce_activity} <- insert(unannounce_data, local),
410 :ok <- maybe_federate(unannounce_activity),
411 {:ok, _activity} <- Repo.delete(announce_activity),
412 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
413 {:ok, unannounce_activity, object}
414 else
415 _e -> {:ok, object}
416 end
417 end
418
419 def follow(follower, followed, activity_id \\ nil, local \\ true) do
420 with data <- make_follow_data(follower, followed, activity_id),
421 {:ok, activity} <- insert(data, local),
422 :ok <- maybe_federate(activity),
423 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
424 {:ok, activity}
425 end
426 end
427
428 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
429 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
430 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
431 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
432 {:ok, activity} <- insert(unfollow_data, local),
433 :ok <- maybe_federate(activity) do
434 {:ok, activity}
435 end
436 end
437
438 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
439 with data <- %{
440 "to" => [follower_address],
441 "type" => "Delete",
442 "actor" => ap_id,
443 "object" => %{"type" => "Person", "id" => ap_id}
444 },
445 {:ok, activity} <- insert(data, true, true, true),
446 :ok <- maybe_federate(activity) do
447 {:ok, user}
448 end
449 end
450
451 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
452 local = Keyword.get(options, :local, true)
453 activity_id = Keyword.get(options, :activity_id, nil)
454 actor = Keyword.get(options, :actor, actor)
455
456 user = User.get_cached_by_ap_id(actor)
457 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
458
459 with {:ok, object, activity} <- Object.delete(object),
460 data <-
461 %{
462 "type" => "Delete",
463 "actor" => actor,
464 "object" => id,
465 "to" => to,
466 "deleted_activity_id" => activity && activity.id
467 }
468 |> maybe_put("id", activity_id),
469 {:ok, activity} <- insert(data, local, false),
470 stream_out_participations(object, user),
471 _ <- decrease_replies_count_if_reply(object),
472 {:ok, _actor} <- decrease_note_count_if_public(user, object),
473 :ok <- maybe_federate(activity) do
474 {:ok, activity}
475 end
476 end
477
478 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
479 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
480 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
481 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
482
483 if unfollow_blocked do
484 follow_activity = fetch_latest_follow(blocker, blocked)
485 if follow_activity, do: unfollow(blocker, blocked, nil, local)
486 end
487
488 with true <- outgoing_blocks,
489 block_data <- make_block_data(blocker, blocked, activity_id),
490 {:ok, activity} <- insert(block_data, local),
491 :ok <- maybe_federate(activity) do
492 {:ok, activity}
493 else
494 _e -> {:ok, nil}
495 end
496 end
497
498 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
499 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
500 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
501 {:ok, activity} <- insert(unblock_data, local),
502 :ok <- maybe_federate(activity) do
503 {:ok, activity}
504 end
505 end
506
507 @spec flag(map()) :: {:ok, Activity.t()} | any
508 def flag(
509 %{
510 actor: actor,
511 context: _context,
512 account: account,
513 statuses: statuses,
514 content: content
515 } = params
516 ) do
517 # only accept false as false value
518 local = !(params[:local] == false)
519 forward = !(params[:forward] == false)
520
521 additional = params[:additional] || %{}
522
523 additional =
524 if forward do
525 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
526 else
527 Map.merge(additional, %{"to" => [], "cc" => []})
528 end
529
530 with flag_data <- make_flag_data(params, additional),
531 {:ok, activity} <- insert(flag_data, local),
532 {:ok, stripped_activity} <- strip_report_status_data(activity),
533 :ok <- maybe_federate(stripped_activity) do
534 Enum.each(User.all_superusers(), fn superuser ->
535 superuser
536 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
537 |> Pleroma.Emails.Mailer.deliver_async()
538 end)
539
540 {:ok, activity}
541 end
542 end
543
544 def move(%User{} = origin, %User{} = target, local \\ true) do
545 params = %{
546 "type" => "Move",
547 "actor" => origin.ap_id,
548 "object" => origin.ap_id,
549 "target" => target.ap_id
550 }
551
552 with true <- origin.ap_id in target.also_known_as,
553 {:ok, activity} <- insert(params, local) do
554 maybe_federate(activity)
555
556 BackgroundWorker.enqueue("move_following", %{
557 "origin_id" => origin.id,
558 "target_id" => target.id
559 })
560
561 {:ok, activity}
562 else
563 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
564 err -> err
565 end
566 end
567
568 defp fetch_activities_for_context_query(context, opts) do
569 public = [Pleroma.Constants.as_public()]
570
571 recipients =
572 if opts["user"],
573 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
574 else: public
575
576 from(activity in Activity)
577 |> maybe_preload_objects(opts)
578 |> maybe_preload_bookmarks(opts)
579 |> maybe_set_thread_muted_field(opts)
580 |> restrict_blocked(opts)
581 |> restrict_recipients(recipients, opts["user"])
582 |> where(
583 [activity],
584 fragment(
585 "?->>'type' = ? and ?->>'context' = ?",
586 activity.data,
587 "Create",
588 activity.data,
589 ^context
590 )
591 )
592 |> exclude_poll_votes(opts)
593 |> exclude_id(opts)
594 |> order_by([activity], desc: activity.id)
595 end
596
597 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
598 def fetch_activities_for_context(context, opts \\ %{}) do
599 context
600 |> fetch_activities_for_context_query(opts)
601 |> Repo.all()
602 end
603
604 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
605 FlakeId.Ecto.CompatType.t() | nil
606 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
607 context
608 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
609 |> limit(1)
610 |> select([a], a.id)
611 |> Repo.one()
612 end
613
614 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
615 opts = Map.drop(opts, ["user"])
616
617 [Pleroma.Constants.as_public()]
618 |> fetch_activities_query(opts)
619 |> restrict_unlisted()
620 |> Pagination.fetch_paginated(opts, pagination)
621 end
622
623 @valid_visibilities ~w[direct unlisted public private]
624
625 defp restrict_visibility(query, %{visibility: visibility})
626 when is_list(visibility) do
627 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
628 query =
629 from(
630 a in query,
631 where:
632 fragment(
633 "activity_visibility(?, ?, ?) = ANY (?)",
634 a.actor,
635 a.recipients,
636 a.data,
637 ^visibility
638 )
639 )
640
641 query
642 else
643 Logger.error("Could not restrict visibility to #{visibility}")
644 end
645 end
646
647 defp restrict_visibility(query, %{visibility: visibility})
648 when visibility in @valid_visibilities do
649 from(
650 a in query,
651 where:
652 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
653 )
654 end
655
656 defp restrict_visibility(_query, %{visibility: visibility})
657 when visibility not in @valid_visibilities do
658 Logger.error("Could not restrict visibility to #{visibility}")
659 end
660
661 defp restrict_visibility(query, _visibility), do: query
662
663 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
664 when is_list(visibility) do
665 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
666 from(
667 a in query,
668 where:
669 not fragment(
670 "activity_visibility(?, ?, ?) = ANY (?)",
671 a.actor,
672 a.recipients,
673 a.data,
674 ^visibility
675 )
676 )
677 else
678 Logger.error("Could not exclude visibility to #{visibility}")
679 query
680 end
681 end
682
683 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
684 when visibility in @valid_visibilities do
685 from(
686 a in query,
687 where:
688 not fragment(
689 "activity_visibility(?, ?, ?) = ?",
690 a.actor,
691 a.recipients,
692 a.data,
693 ^visibility
694 )
695 )
696 end
697
698 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
699 when visibility not in @valid_visibilities do
700 Logger.error("Could not exclude visibility to #{visibility}")
701 query
702 end
703
704 defp exclude_visibility(query, _visibility), do: query
705
706 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
707 do: query
708
709 defp restrict_thread_visibility(
710 query,
711 %{"user" => %User{skip_thread_containment: true}},
712 _
713 ),
714 do: query
715
716 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
717 from(
718 a in query,
719 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
720 )
721 end
722
723 defp restrict_thread_visibility(query, _, _), do: query
724
725 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
726 params =
727 params
728 |> Map.put("user", reading_user)
729 |> Map.put("actor_id", user.ap_id)
730 |> Map.put("whole_db", true)
731
732 recipients =
733 user_activities_recipients(%{
734 "godmode" => params["godmode"],
735 "reading_user" => reading_user
736 })
737
738 fetch_activities(recipients, params)
739 |> Enum.reverse()
740 end
741
742 def fetch_user_activities(user, reading_user, params \\ %{}) do
743 params =
744 params
745 |> Map.put("type", ["Create", "Announce"])
746 |> Map.put("user", reading_user)
747 |> Map.put("actor_id", user.ap_id)
748 |> Map.put("whole_db", true)
749 |> Map.put("pinned_activity_ids", user.pinned_activities)
750
751 params =
752 if User.blocks?(reading_user, user) do
753 params
754 else
755 params
756 |> Map.put("blocking_user", reading_user)
757 |> Map.put("muting_user", reading_user)
758 end
759
760 recipients =
761 user_activities_recipients(%{
762 "godmode" => params["godmode"],
763 "reading_user" => reading_user
764 })
765
766 fetch_activities(recipients, params)
767 |> Enum.reverse()
768 end
769
770 def fetch_instance_activities(params) do
771 params =
772 params
773 |> Map.put("type", ["Create", "Announce"])
774 |> Map.put("instance", params["instance"])
775 |> Map.put("whole_db", true)
776
777 fetch_activities([Pleroma.Constants.as_public()], params, :offset)
778 |> Enum.reverse()
779 end
780
781 defp user_activities_recipients(%{"godmode" => true}) do
782 []
783 end
784
785 defp user_activities_recipients(%{"reading_user" => reading_user}) do
786 if reading_user do
787 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
788 else
789 [Pleroma.Constants.as_public()]
790 end
791 end
792
793 defp restrict_since(query, %{"since_id" => ""}), do: query
794
795 defp restrict_since(query, %{"since_id" => since_id}) do
796 from(activity in query, where: activity.id > ^since_id)
797 end
798
799 defp restrict_since(query, _), do: query
800
801 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
802 raise "Can't use the child object without preloading!"
803 end
804
805 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
806 when is_list(tag_reject) and tag_reject != [] do
807 from(
808 [_activity, object] in query,
809 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
810 )
811 end
812
813 defp restrict_tag_reject(query, _), do: query
814
815 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
816 raise "Can't use the child object without preloading!"
817 end
818
819 defp restrict_tag_all(query, %{"tag_all" => tag_all})
820 when is_list(tag_all) and tag_all != [] do
821 from(
822 [_activity, object] in query,
823 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
824 )
825 end
826
827 defp restrict_tag_all(query, _), do: query
828
829 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
830 raise "Can't use the child object without preloading!"
831 end
832
833 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
834 from(
835 [_activity, object] in query,
836 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
837 )
838 end
839
840 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
841 from(
842 [_activity, object] in query,
843 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
844 )
845 end
846
847 defp restrict_tag(query, _), do: query
848
849 defp restrict_recipients(query, [], _user), do: query
850
851 defp restrict_recipients(query, recipients, nil) do
852 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
853 end
854
855 defp restrict_recipients(query, recipients, user) do
856 from(
857 activity in query,
858 where: fragment("? && ?", ^recipients, activity.recipients),
859 or_where: activity.actor == ^user.ap_id
860 )
861 end
862
863 defp restrict_local(query, %{"local_only" => true}) do
864 from(activity in query, where: activity.local == true)
865 end
866
867 defp restrict_local(query, _), do: query
868
869 defp restrict_actor(query, %{"actor_id" => actor_id}) do
870 from(activity in query, where: activity.actor == ^actor_id)
871 end
872
873 defp restrict_actor(query, _), do: query
874
875 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
876 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
877 end
878
879 defp restrict_type(query, %{"type" => type}) do
880 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
881 end
882
883 defp restrict_type(query, _), do: query
884
885 defp restrict_state(query, %{"state" => state}) do
886 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
887 end
888
889 defp restrict_state(query, _), do: query
890
891 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
892 from(
893 [_activity, object] in query,
894 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
895 )
896 end
897
898 defp restrict_favorited_by(query, _), do: query
899
900 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
901 raise "Can't use the child object without preloading!"
902 end
903
904 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
905 from(
906 [_activity, object] in query,
907 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
908 )
909 end
910
911 defp restrict_media(query, _), do: query
912
913 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
914 from(
915 [_activity, object] in query,
916 where: fragment("?->>'inReplyTo' is null", object.data)
917 )
918 end
919
920 defp restrict_replies(query, _), do: query
921
922 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
923 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
924 end
925
926 defp restrict_reblogs(query, _), do: query
927
928 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
929
930 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
931 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
932
933 query =
934 from([activity] in query,
935 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
936 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
937 )
938
939 unless opts["skip_preload"] do
940 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
941 else
942 query
943 end
944 end
945
946 defp restrict_muted(query, _), do: query
947
948 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
949 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
950 domain_blocks = user.domain_blocks || []
951
952 query =
953 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
954
955 from(
956 [activity, object: o] in query,
957 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
958 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
959 where:
960 fragment(
961 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
962 activity.data,
963 activity.data,
964 ^blocked_ap_ids
965 ),
966 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
967 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
968 )
969 end
970
971 defp restrict_blocked(query, _), do: query
972
973 defp restrict_unlisted(query) do
974 from(
975 activity in query,
976 where:
977 fragment(
978 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
979 activity.data,
980 ^[Pleroma.Constants.as_public()]
981 )
982 )
983 end
984
985 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
986 from(activity in query, where: activity.id in ^ids)
987 end
988
989 defp restrict_pinned(query, _), do: query
990
991 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
992 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
993
994 from(
995 activity in query,
996 where:
997 fragment(
998 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
999 activity.data,
1000 activity.actor,
1001 ^muted_reblogs
1002 )
1003 )
1004 end
1005
1006 defp restrict_muted_reblogs(query, _), do: query
1007
1008 defp restrict_instance(query, %{"instance" => instance}) do
1009 users =
1010 from(
1011 u in User,
1012 select: u.ap_id,
1013 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1014 )
1015 |> Repo.all()
1016
1017 from(activity in query, where: activity.actor in ^users)
1018 end
1019
1020 defp restrict_instance(query, _), do: query
1021
1022 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1023
1024 defp exclude_poll_votes(query, _) do
1025 if has_named_binding?(query, :object) do
1026 from([activity, object: o] in query,
1027 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1028 )
1029 else
1030 query
1031 end
1032 end
1033
1034 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1035 from(activity in query, where: activity.id != ^id)
1036 end
1037
1038 defp exclude_id(query, _), do: query
1039
1040 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1041
1042 defp maybe_preload_objects(query, _) do
1043 query
1044 |> Activity.with_preloaded_object()
1045 end
1046
1047 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1048
1049 defp maybe_preload_bookmarks(query, opts) do
1050 query
1051 |> Activity.with_preloaded_bookmark(opts["user"])
1052 end
1053
1054 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1055
1056 defp maybe_set_thread_muted_field(query, opts) do
1057 query
1058 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1059 end
1060
1061 defp maybe_order(query, %{order: :desc}) do
1062 query
1063 |> order_by(desc: :id)
1064 end
1065
1066 defp maybe_order(query, %{order: :asc}) do
1067 query
1068 |> order_by(asc: :id)
1069 end
1070
1071 defp maybe_order(query, _), do: query
1072
1073 defp fetch_activities_query_ap_ids_ops(opts) do
1074 source_user = opts["muting_user"]
1075 ap_id_relations = if source_user, do: [:mute, :reblog_mute], else: []
1076
1077 ap_id_relations =
1078 ap_id_relations ++
1079 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1080 [:block]
1081 else
1082 []
1083 end
1084
1085 preloaded_ap_ids = User.outgoing_relations_ap_ids(source_user, ap_id_relations)
1086
1087 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1088 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1089
1090 restrict_muted_reblogs_opts =
1091 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1092
1093 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1094 end
1095
1096 def fetch_activities_query(recipients, opts \\ %{}) do
1097 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1098 fetch_activities_query_ap_ids_ops(opts)
1099
1100 config = %{
1101 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1102 }
1103
1104 Activity
1105 |> maybe_preload_objects(opts)
1106 |> maybe_preload_bookmarks(opts)
1107 |> maybe_set_thread_muted_field(opts)
1108 |> maybe_order(opts)
1109 |> restrict_recipients(recipients, opts["user"])
1110 |> restrict_tag(opts)
1111 |> restrict_tag_reject(opts)
1112 |> restrict_tag_all(opts)
1113 |> restrict_since(opts)
1114 |> restrict_local(opts)
1115 |> restrict_actor(opts)
1116 |> restrict_type(opts)
1117 |> restrict_state(opts)
1118 |> restrict_favorited_by(opts)
1119 |> restrict_blocked(restrict_blocked_opts)
1120 |> restrict_muted(restrict_muted_opts)
1121 |> restrict_media(opts)
1122 |> restrict_visibility(opts)
1123 |> restrict_thread_visibility(opts, config)
1124 |> restrict_replies(opts)
1125 |> restrict_reblogs(opts)
1126 |> restrict_pinned(opts)
1127 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1128 |> restrict_instance(opts)
1129 |> Activity.restrict_deactivated_users()
1130 |> exclude_poll_votes(opts)
1131 |> exclude_visibility(opts)
1132 end
1133
1134 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1135 list_memberships = Pleroma.List.memberships(opts["user"])
1136
1137 fetch_activities_query(recipients ++ list_memberships, opts)
1138 |> Pagination.fetch_paginated(opts, pagination)
1139 |> Enum.reverse()
1140 |> maybe_update_cc(list_memberships, opts["user"])
1141 end
1142
1143 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1144 when is_list(list_memberships) and length(list_memberships) > 0 do
1145 Enum.map(activities, fn
1146 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1147 if Enum.any?(bcc, &(&1 in list_memberships)) do
1148 update_in(activity.data["cc"], &[user_ap_id | &1])
1149 else
1150 activity
1151 end
1152
1153 activity ->
1154 activity
1155 end)
1156 end
1157
1158 defp maybe_update_cc(activities, _, _), do: activities
1159
1160 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1161 from(activity in query,
1162 where:
1163 fragment("? && ?", activity.recipients, ^recipients) or
1164 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1165 ^Pleroma.Constants.as_public() in activity.recipients)
1166 )
1167 end
1168
1169 def fetch_activities_bounded(
1170 recipients,
1171 recipients_with_public,
1172 opts \\ %{},
1173 pagination \\ :keyset
1174 ) do
1175 fetch_activities_query([], opts)
1176 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1177 |> Pagination.fetch_paginated(opts, pagination)
1178 |> Enum.reverse()
1179 end
1180
1181 def upload(file, opts \\ []) do
1182 with {:ok, data} <- Upload.store(file, opts) do
1183 obj_data =
1184 if opts[:actor] do
1185 Map.put(data, "actor", opts[:actor])
1186 else
1187 data
1188 end
1189
1190 Repo.insert(%Object{data: obj_data})
1191 end
1192 end
1193
1194 defp object_to_user_data(data) do
1195 avatar =
1196 data["icon"]["url"] &&
1197 %{
1198 "type" => "Image",
1199 "url" => [%{"href" => data["icon"]["url"]}]
1200 }
1201
1202 banner =
1203 data["image"]["url"] &&
1204 %{
1205 "type" => "Image",
1206 "url" => [%{"href" => data["image"]["url"]}]
1207 }
1208
1209 fields =
1210 data
1211 |> Map.get("attachment", [])
1212 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1213 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1214
1215 locked = data["manuallyApprovesFollowers"] || false
1216 data = Transmogrifier.maybe_fix_user_object(data)
1217 discoverable = data["discoverable"] || false
1218 invisible = data["invisible"] || false
1219
1220 user_data = %{
1221 ap_id: data["id"],
1222 ap_enabled: true,
1223 source_data: data,
1224 banner: banner,
1225 fields: fields,
1226 locked: locked,
1227 discoverable: discoverable,
1228 invisible: invisible,
1229 avatar: avatar,
1230 name: data["name"],
1231 follower_address: data["followers"],
1232 following_address: data["following"],
1233 bio: data["summary"],
1234 also_known_as: Map.get(data, "alsoKnownAs", [])
1235 }
1236
1237 # nickname can be nil because of virtual actors
1238 user_data =
1239 if data["preferredUsername"] do
1240 Map.put(
1241 user_data,
1242 :nickname,
1243 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1244 )
1245 else
1246 Map.put(user_data, :nickname, nil)
1247 end
1248
1249 {:ok, user_data}
1250 end
1251
1252 def fetch_follow_information_for_user(user) do
1253 with {:ok, following_data} <-
1254 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1255 following_count when is_integer(following_count) <- following_data["totalItems"],
1256 {:ok, hide_follows} <- collection_private(following_data),
1257 {:ok, followers_data} <-
1258 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1259 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1260 {:ok, hide_followers} <- collection_private(followers_data) do
1261 {:ok,
1262 %{
1263 hide_follows: hide_follows,
1264 follower_count: followers_count,
1265 following_count: following_count,
1266 hide_followers: hide_followers
1267 }}
1268 else
1269 {:error, _} = e ->
1270 e
1271
1272 e ->
1273 {:error, e}
1274 end
1275 end
1276
1277 defp maybe_update_follow_information(data) do
1278 with {:enabled, true} <-
1279 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1280 {:ok, info} <- fetch_follow_information_for_user(data) do
1281 info = Map.merge(data[:info] || %{}, info)
1282 Map.put(data, :info, info)
1283 else
1284 {:enabled, false} ->
1285 data
1286
1287 e ->
1288 Logger.error(
1289 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1290 )
1291
1292 data
1293 end
1294 end
1295
1296 defp collection_private(%{"first" => first}) do
1297 if is_map(first) and
1298 first["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1299 {:ok, false}
1300 else
1301 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1302 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1303 {:ok, false}
1304 else
1305 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1306 {:ok, true}
1307
1308 {:error, _} = e ->
1309 e
1310
1311 e ->
1312 {:error, e}
1313 end
1314 end
1315 end
1316
1317 defp collection_private(_data), do: {:ok, true}
1318
1319 def user_data_from_user_object(data) do
1320 with {:ok, data} <- MRF.filter(data),
1321 {:ok, data} <- object_to_user_data(data) do
1322 {:ok, data}
1323 else
1324 e -> {:error, e}
1325 end
1326 end
1327
1328 def fetch_and_prepare_user_from_ap_id(ap_id) do
1329 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1330 {:ok, data} <- user_data_from_user_object(data),
1331 data <- maybe_update_follow_information(data) do
1332 {:ok, data}
1333 else
1334 e ->
1335 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1336 {:error, e}
1337 end
1338 end
1339
1340 def make_user_from_ap_id(ap_id) do
1341 if _user = User.get_cached_by_ap_id(ap_id) do
1342 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1343 else
1344 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1345 User.insert_or_update_user(data)
1346 else
1347 e -> {:error, e}
1348 end
1349 end
1350 end
1351
1352 def make_user_from_nickname(nickname) do
1353 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1354 make_user_from_ap_id(ap_id)
1355 else
1356 _e -> {:error, "No AP id in WebFinger"}
1357 end
1358 end
1359
1360 # filter out broken threads
1361 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1362 entire_thread_visible_for_user?(activity, user)
1363 end
1364
1365 # do post-processing on a specific activity
1366 def contain_activity(%Activity{} = activity, %User{} = user) do
1367 contain_broken_threads(activity, user)
1368 end
1369
1370 def fetch_direct_messages_query do
1371 Activity
1372 |> restrict_type(%{"type" => "Create"})
1373 |> restrict_visibility(%{visibility: "direct"})
1374 |> order_by([activity], asc: activity.id)
1375 end
1376 end