Merge branch 'feature/move-activity' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 else
255 {:quick_insert, true, activity} ->
256 {:ok, activity}
257
258 {:fake, true, activity} ->
259 {:ok, activity}
260
261 {:error, message} ->
262 {:error, message}
263 end
264 end
265
266 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
267 additional = params[:additional] || %{}
268 # only accept false as false value
269 local = !(params[:local] == false)
270 published = params[:published]
271
272 with listen_data <-
273 make_listen_data(
274 %{to: to, actor: actor, published: published, context: context, object: object},
275 additional
276 ),
277 {:ok, activity} <- insert(listen_data, local),
278 :ok <- maybe_federate(activity) do
279 {:ok, activity}
280 else
281 {:error, message} ->
282 {:error, message}
283 end
284 end
285
286 def accept(params) do
287 accept_or_reject("Accept", params)
288 end
289
290 def reject(params) do
291 accept_or_reject("Reject", params)
292 end
293
294 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
297
298 with data <-
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Utils.maybe_put("id", activity_id),
301 {:ok, activity} <- insert(data, local),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 end
305 end
306
307 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
308 local = !(params[:local] == false)
309 activity_id = params[:activity_id]
310
311 with data <- %{
312 "to" => to,
313 "cc" => cc,
314 "type" => "Update",
315 "actor" => actor,
316 "object" => object
317 },
318 data <- Utils.maybe_put(data, "id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 def react_with_emoji(user, object, emoji, options \\ []) do
326 with local <- Keyword.get(options, :local, true),
327 activity_id <- Keyword.get(options, :activity_id, nil),
328 Pleroma.Emoji.is_unicode_emoji?(emoji),
329 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
330 {:ok, activity} <- insert(reaction_data, local),
331 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
332 :ok <- maybe_federate(activity) do
333 {:ok, activity, object}
334 end
335 end
336
337 def unreact_with_emoji(user, reaction_id, options \\ []) do
338 with local <- Keyword.get(options, :local, true),
339 activity_id <- Keyword.get(options, :activity_id, nil),
340 user_ap_id <- user.ap_id,
341 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
342 object <- Object.normalize(reaction_activity),
343 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
344 {:ok, activity} <- insert(unreact_data, local),
345 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
346 :ok <- maybe_federate(activity) do
347 {:ok, activity, object}
348 end
349 end
350
351 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
352 def like(
353 %User{ap_id: ap_id} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true
357 ) do
358 with nil <- get_existing_like(ap_id, object),
359 like_data <- make_like_data(user, object, activity_id),
360 {:ok, activity} <- insert(like_data, local),
361 {:ok, object} <- add_like_to_object(activity, object),
362 :ok <- maybe_federate(activity) do
363 {:ok, activity, object}
364 else
365 %Activity{} = activity -> {:ok, activity, object}
366 error -> {:error, error}
367 end
368 end
369
370 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
371 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
372 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
373 {:ok, unlike_activity} <- insert(unlike_data, local),
374 {:ok, _activity} <- Repo.delete(like_activity),
375 {:ok, object} <- remove_like_from_object(like_activity, object),
376 :ok <- maybe_federate(unlike_activity) do
377 {:ok, unlike_activity, like_activity, object}
378 else
379 _e -> {:ok, object}
380 end
381 end
382
383 def announce(
384 %User{ap_id: _} = user,
385 %Object{data: %{"id" => _}} = object,
386 activity_id \\ nil,
387 local \\ true,
388 public \\ true
389 ) do
390 with true <- is_announceable?(object, user, public),
391 announce_data <- make_announce_data(user, object, activity_id, public),
392 {:ok, activity} <- insert(announce_data, local),
393 {:ok, object} <- add_announce_to_object(activity, object),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity, object}
396 else
397 error -> {:error, error}
398 end
399 end
400
401 def unannounce(
402 %User{} = actor,
403 %Object{} = object,
404 activity_id \\ nil,
405 local \\ true
406 ) do
407 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
408 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
409 {:ok, unannounce_activity} <- insert(unannounce_data, local),
410 :ok <- maybe_federate(unannounce_activity),
411 {:ok, _activity} <- Repo.delete(announce_activity),
412 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
413 {:ok, unannounce_activity, object}
414 else
415 _e -> {:ok, object}
416 end
417 end
418
419 def follow(follower, followed, activity_id \\ nil, local \\ true) do
420 with data <- make_follow_data(follower, followed, activity_id),
421 {:ok, activity} <- insert(data, local),
422 :ok <- maybe_federate(activity),
423 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
424 {:ok, activity}
425 end
426 end
427
428 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
429 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
430 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
431 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
432 {:ok, activity} <- insert(unfollow_data, local),
433 :ok <- maybe_federate(activity) do
434 {:ok, activity}
435 end
436 end
437
438 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
439 with data <- %{
440 "to" => [follower_address],
441 "type" => "Delete",
442 "actor" => ap_id,
443 "object" => %{"type" => "Person", "id" => ap_id}
444 },
445 {:ok, activity} <- insert(data, true, true, true),
446 :ok <- maybe_federate(activity) do
447 {:ok, user}
448 end
449 end
450
451 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
452 local = Keyword.get(options, :local, true)
453 activity_id = Keyword.get(options, :activity_id, nil)
454 actor = Keyword.get(options, :actor, actor)
455
456 user = User.get_cached_by_ap_id(actor)
457 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
458
459 with {:ok, object, activity} <- Object.delete(object),
460 data <-
461 %{
462 "type" => "Delete",
463 "actor" => actor,
464 "object" => id,
465 "to" => to,
466 "deleted_activity_id" => activity && activity.id
467 }
468 |> maybe_put("id", activity_id),
469 {:ok, activity} <- insert(data, local, false),
470 stream_out_participations(object, user),
471 _ <- decrease_replies_count_if_reply(object),
472 {:ok, _actor} <- decrease_note_count_if_public(user, object),
473 :ok <- maybe_federate(activity) do
474 {:ok, activity}
475 end
476 end
477
478 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
479 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
480 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
481 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
482
483 if unfollow_blocked do
484 follow_activity = fetch_latest_follow(blocker, blocked)
485 if follow_activity, do: unfollow(blocker, blocked, nil, local)
486 end
487
488 with true <- outgoing_blocks,
489 block_data <- make_block_data(blocker, blocked, activity_id),
490 {:ok, activity} <- insert(block_data, local),
491 :ok <- maybe_federate(activity) do
492 {:ok, activity}
493 else
494 _e -> {:ok, nil}
495 end
496 end
497
498 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
499 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
500 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
501 {:ok, activity} <- insert(unblock_data, local),
502 :ok <- maybe_federate(activity) do
503 {:ok, activity}
504 end
505 end
506
507 @spec flag(map()) :: {:ok, Activity.t()} | any
508 def flag(
509 %{
510 actor: actor,
511 context: _context,
512 account: account,
513 statuses: statuses,
514 content: content
515 } = params
516 ) do
517 # only accept false as false value
518 local = !(params[:local] == false)
519 forward = !(params[:forward] == false)
520
521 additional = params[:additional] || %{}
522
523 additional =
524 if forward do
525 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
526 else
527 Map.merge(additional, %{"to" => [], "cc" => []})
528 end
529
530 with flag_data <- make_flag_data(params, additional),
531 {:ok, activity} <- insert(flag_data, local),
532 {:ok, stripped_activity} <- strip_report_status_data(activity),
533 :ok <- maybe_federate(stripped_activity) do
534 Enum.each(User.all_superusers(), fn superuser ->
535 superuser
536 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
537 |> Pleroma.Emails.Mailer.deliver_async()
538 end)
539
540 {:ok, activity}
541 end
542 end
543
544 def move(%User{} = origin, %User{} = target, local \\ true) do
545 params = %{
546 "type" => "Move",
547 "actor" => origin.ap_id,
548 "object" => origin.ap_id,
549 "target" => target.ap_id
550 }
551
552 with true <- origin.ap_id in target.also_known_as,
553 {:ok, activity} <- insert(params, local) do
554 maybe_federate(activity)
555
556 BackgroundWorker.enqueue("move_following", %{
557 "origin_id" => origin.id,
558 "target_id" => target.id
559 })
560
561 {:ok, activity}
562 else
563 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
564 err -> err
565 end
566 end
567
568 defp fetch_activities_for_context_query(context, opts) do
569 public = [Pleroma.Constants.as_public()]
570
571 recipients =
572 if opts["user"],
573 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
574 else: public
575
576 from(activity in Activity)
577 |> maybe_preload_objects(opts)
578 |> maybe_preload_bookmarks(opts)
579 |> maybe_set_thread_muted_field(opts)
580 |> restrict_blocked(opts)
581 |> restrict_recipients(recipients, opts["user"])
582 |> where(
583 [activity],
584 fragment(
585 "?->>'type' = ? and ?->>'context' = ?",
586 activity.data,
587 "Create",
588 activity.data,
589 ^context
590 )
591 )
592 |> exclude_poll_votes(opts)
593 |> exclude_id(opts)
594 |> order_by([activity], desc: activity.id)
595 end
596
597 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
598 def fetch_activities_for_context(context, opts \\ %{}) do
599 context
600 |> fetch_activities_for_context_query(opts)
601 |> Repo.all()
602 end
603
604 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
605 FlakeId.Ecto.CompatType.t() | nil
606 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
607 context
608 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
609 |> limit(1)
610 |> select([a], a.id)
611 |> Repo.one()
612 end
613
614 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
615 opts = Map.drop(opts, ["user"])
616
617 [Pleroma.Constants.as_public()]
618 |> fetch_activities_query(opts)
619 |> restrict_unlisted()
620 |> Pagination.fetch_paginated(opts, pagination)
621 end
622
623 @valid_visibilities ~w[direct unlisted public private]
624
625 defp restrict_visibility(query, %{visibility: visibility})
626 when is_list(visibility) do
627 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
628 query =
629 from(
630 a in query,
631 where:
632 fragment(
633 "activity_visibility(?, ?, ?) = ANY (?)",
634 a.actor,
635 a.recipients,
636 a.data,
637 ^visibility
638 )
639 )
640
641 query
642 else
643 Logger.error("Could not restrict visibility to #{visibility}")
644 end
645 end
646
647 defp restrict_visibility(query, %{visibility: visibility})
648 when visibility in @valid_visibilities do
649 from(
650 a in query,
651 where:
652 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
653 )
654 end
655
656 defp restrict_visibility(_query, %{visibility: visibility})
657 when visibility not in @valid_visibilities do
658 Logger.error("Could not restrict visibility to #{visibility}")
659 end
660
661 defp restrict_visibility(query, _visibility), do: query
662
663 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
664 when is_list(visibility) do
665 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
666 from(
667 a in query,
668 where:
669 not fragment(
670 "activity_visibility(?, ?, ?) = ANY (?)",
671 a.actor,
672 a.recipients,
673 a.data,
674 ^visibility
675 )
676 )
677 else
678 Logger.error("Could not exclude visibility to #{visibility}")
679 query
680 end
681 end
682
683 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
684 when visibility in @valid_visibilities do
685 from(
686 a in query,
687 where:
688 not fragment(
689 "activity_visibility(?, ?, ?) = ?",
690 a.actor,
691 a.recipients,
692 a.data,
693 ^visibility
694 )
695 )
696 end
697
698 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
699 when visibility not in @valid_visibilities do
700 Logger.error("Could not exclude visibility to #{visibility}")
701 query
702 end
703
704 defp exclude_visibility(query, _visibility), do: query
705
706 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
707 do: query
708
709 defp restrict_thread_visibility(
710 query,
711 %{"user" => %User{skip_thread_containment: true}},
712 _
713 ),
714 do: query
715
716 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
717 from(
718 a in query,
719 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
720 )
721 end
722
723 defp restrict_thread_visibility(query, _, _), do: query
724
725 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
726 params =
727 params
728 |> Map.put("user", reading_user)
729 |> Map.put("actor_id", user.ap_id)
730 |> Map.put("whole_db", true)
731
732 recipients =
733 user_activities_recipients(%{
734 "godmode" => params["godmode"],
735 "reading_user" => reading_user
736 })
737
738 fetch_activities(recipients, params)
739 |> Enum.reverse()
740 end
741
742 def fetch_user_activities(user, reading_user, params \\ %{}) do
743 params =
744 params
745 |> Map.put("type", ["Create", "Announce"])
746 |> Map.put("user", reading_user)
747 |> Map.put("actor_id", user.ap_id)
748 |> Map.put("whole_db", true)
749 |> Map.put("pinned_activity_ids", user.pinned_activities)
750
751 recipients =
752 user_activities_recipients(%{
753 "godmode" => params["godmode"],
754 "reading_user" => reading_user
755 })
756
757 fetch_activities(recipients, params)
758 |> Enum.reverse()
759 end
760
761 def fetch_instance_activities(params) do
762 params =
763 params
764 |> Map.put("type", ["Create", "Announce"])
765 |> Map.put("instance", params["instance"])
766 |> Map.put("whole_db", true)
767
768 fetch_activities([Pleroma.Constants.as_public()], params, :offset)
769 |> Enum.reverse()
770 end
771
772 defp user_activities_recipients(%{"godmode" => true}) do
773 []
774 end
775
776 defp user_activities_recipients(%{"reading_user" => reading_user}) do
777 if reading_user do
778 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
779 else
780 [Pleroma.Constants.as_public()]
781 end
782 end
783
784 defp restrict_since(query, %{"since_id" => ""}), do: query
785
786 defp restrict_since(query, %{"since_id" => since_id}) do
787 from(activity in query, where: activity.id > ^since_id)
788 end
789
790 defp restrict_since(query, _), do: query
791
792 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
793 raise "Can't use the child object without preloading!"
794 end
795
796 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
797 when is_list(tag_reject) and tag_reject != [] do
798 from(
799 [_activity, object] in query,
800 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
801 )
802 end
803
804 defp restrict_tag_reject(query, _), do: query
805
806 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
807 raise "Can't use the child object without preloading!"
808 end
809
810 defp restrict_tag_all(query, %{"tag_all" => tag_all})
811 when is_list(tag_all) and tag_all != [] do
812 from(
813 [_activity, object] in query,
814 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
815 )
816 end
817
818 defp restrict_tag_all(query, _), do: query
819
820 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
821 raise "Can't use the child object without preloading!"
822 end
823
824 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
825 from(
826 [_activity, object] in query,
827 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
828 )
829 end
830
831 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
832 from(
833 [_activity, object] in query,
834 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
835 )
836 end
837
838 defp restrict_tag(query, _), do: query
839
840 defp restrict_recipients(query, [], _user), do: query
841
842 defp restrict_recipients(query, recipients, nil) do
843 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
844 end
845
846 defp restrict_recipients(query, recipients, user) do
847 from(
848 activity in query,
849 where: fragment("? && ?", ^recipients, activity.recipients),
850 or_where: activity.actor == ^user.ap_id
851 )
852 end
853
854 defp restrict_local(query, %{"local_only" => true}) do
855 from(activity in query, where: activity.local == true)
856 end
857
858 defp restrict_local(query, _), do: query
859
860 defp restrict_actor(query, %{"actor_id" => actor_id}) do
861 from(activity in query, where: activity.actor == ^actor_id)
862 end
863
864 defp restrict_actor(query, _), do: query
865
866 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
867 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
868 end
869
870 defp restrict_type(query, %{"type" => type}) do
871 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
872 end
873
874 defp restrict_type(query, _), do: query
875
876 defp restrict_state(query, %{"state" => state}) do
877 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
878 end
879
880 defp restrict_state(query, _), do: query
881
882 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
883 from(
884 [_activity, object] in query,
885 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
886 )
887 end
888
889 defp restrict_favorited_by(query, _), do: query
890
891 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
892 raise "Can't use the child object without preloading!"
893 end
894
895 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
896 from(
897 [_activity, object] in query,
898 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
899 )
900 end
901
902 defp restrict_media(query, _), do: query
903
904 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
905 from(
906 [_activity, object] in query,
907 where: fragment("?->>'inReplyTo' is null", object.data)
908 )
909 end
910
911 defp restrict_replies(query, _), do: query
912
913 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
914 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
915 end
916
917 defp restrict_reblogs(query, _), do: query
918
919 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
920
921 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
922 mutes = user.mutes
923
924 query =
925 from([activity] in query,
926 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
927 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
928 )
929
930 unless opts["skip_preload"] do
931 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
932 else
933 query
934 end
935 end
936
937 defp restrict_muted(query, _), do: query
938
939 defp restrict_blocked(query, %{"blocking_user" => %User{} = user}) do
940 blocks = user.blocks || []
941 domain_blocks = user.domain_blocks || []
942
943 query =
944 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
945
946 from(
947 [activity, object: o] in query,
948 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
949 where: fragment("not (? && ?)", activity.recipients, ^blocks),
950 where:
951 fragment(
952 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
953 activity.data,
954 activity.data,
955 ^blocks
956 ),
957 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
958 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
959 )
960 end
961
962 defp restrict_blocked(query, _), do: query
963
964 defp restrict_unlisted(query) do
965 from(
966 activity in query,
967 where:
968 fragment(
969 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
970 activity.data,
971 ^[Pleroma.Constants.as_public()]
972 )
973 )
974 end
975
976 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
977 from(activity in query, where: activity.id in ^ids)
978 end
979
980 defp restrict_pinned(query, _), do: query
981
982 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user}) do
983 muted_reblogs = user.muted_reblogs || []
984
985 from(
986 activity in query,
987 where:
988 fragment(
989 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
990 activity.data,
991 activity.actor,
992 ^muted_reblogs
993 )
994 )
995 end
996
997 defp restrict_muted_reblogs(query, _), do: query
998
999 defp restrict_instance(query, %{"instance" => instance}) do
1000 users =
1001 from(
1002 u in User,
1003 select: u.ap_id,
1004 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1005 )
1006 |> Repo.all()
1007
1008 from(activity in query, where: activity.actor in ^users)
1009 end
1010
1011 defp restrict_instance(query, _), do: query
1012
1013 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1014
1015 defp exclude_poll_votes(query, _) do
1016 if has_named_binding?(query, :object) do
1017 from([activity, object: o] in query,
1018 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1019 )
1020 else
1021 query
1022 end
1023 end
1024
1025 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1027 end
1028
1029 defp exclude_id(query, _), do: query
1030
1031 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1032
1033 defp maybe_preload_objects(query, _) do
1034 query
1035 |> Activity.with_preloaded_object()
1036 end
1037
1038 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1039
1040 defp maybe_preload_bookmarks(query, opts) do
1041 query
1042 |> Activity.with_preloaded_bookmark(opts["user"])
1043 end
1044
1045 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1046
1047 defp maybe_set_thread_muted_field(query, opts) do
1048 query
1049 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1050 end
1051
1052 defp maybe_order(query, %{order: :desc}) do
1053 query
1054 |> order_by(desc: :id)
1055 end
1056
1057 defp maybe_order(query, %{order: :asc}) do
1058 query
1059 |> order_by(asc: :id)
1060 end
1061
1062 defp maybe_order(query, _), do: query
1063
1064 def fetch_activities_query(recipients, opts \\ %{}) do
1065 config = %{
1066 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1067 }
1068
1069 Activity
1070 |> maybe_preload_objects(opts)
1071 |> maybe_preload_bookmarks(opts)
1072 |> maybe_set_thread_muted_field(opts)
1073 |> maybe_order(opts)
1074 |> restrict_recipients(recipients, opts["user"])
1075 |> restrict_tag(opts)
1076 |> restrict_tag_reject(opts)
1077 |> restrict_tag_all(opts)
1078 |> restrict_since(opts)
1079 |> restrict_local(opts)
1080 |> restrict_actor(opts)
1081 |> restrict_type(opts)
1082 |> restrict_state(opts)
1083 |> restrict_favorited_by(opts)
1084 |> restrict_blocked(opts)
1085 |> restrict_muted(opts)
1086 |> restrict_media(opts)
1087 |> restrict_visibility(opts)
1088 |> restrict_thread_visibility(opts, config)
1089 |> restrict_replies(opts)
1090 |> restrict_reblogs(opts)
1091 |> restrict_pinned(opts)
1092 |> restrict_muted_reblogs(opts)
1093 |> restrict_instance(opts)
1094 |> Activity.restrict_deactivated_users()
1095 |> exclude_poll_votes(opts)
1096 |> exclude_visibility(opts)
1097 end
1098
1099 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1100 list_memberships = Pleroma.List.memberships(opts["user"])
1101
1102 fetch_activities_query(recipients ++ list_memberships, opts)
1103 |> Pagination.fetch_paginated(opts, pagination)
1104 |> Enum.reverse()
1105 |> maybe_update_cc(list_memberships, opts["user"])
1106 end
1107
1108 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1109 when is_list(list_memberships) and length(list_memberships) > 0 do
1110 Enum.map(activities, fn
1111 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1112 if Enum.any?(bcc, &(&1 in list_memberships)) do
1113 update_in(activity.data["cc"], &[user_ap_id | &1])
1114 else
1115 activity
1116 end
1117
1118 activity ->
1119 activity
1120 end)
1121 end
1122
1123 defp maybe_update_cc(activities, _, _), do: activities
1124
1125 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1126 from(activity in query,
1127 where:
1128 fragment("? && ?", activity.recipients, ^recipients) or
1129 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1130 ^Pleroma.Constants.as_public() in activity.recipients)
1131 )
1132 end
1133
1134 def fetch_activities_bounded(
1135 recipients,
1136 recipients_with_public,
1137 opts \\ %{},
1138 pagination \\ :keyset
1139 ) do
1140 fetch_activities_query([], opts)
1141 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1142 |> Pagination.fetch_paginated(opts, pagination)
1143 |> Enum.reverse()
1144 end
1145
1146 def upload(file, opts \\ []) do
1147 with {:ok, data} <- Upload.store(file, opts) do
1148 obj_data =
1149 if opts[:actor] do
1150 Map.put(data, "actor", opts[:actor])
1151 else
1152 data
1153 end
1154
1155 Repo.insert(%Object{data: obj_data})
1156 end
1157 end
1158
1159 defp object_to_user_data(data) do
1160 avatar =
1161 data["icon"]["url"] &&
1162 %{
1163 "type" => "Image",
1164 "url" => [%{"href" => data["icon"]["url"]}]
1165 }
1166
1167 banner =
1168 data["image"]["url"] &&
1169 %{
1170 "type" => "Image",
1171 "url" => [%{"href" => data["image"]["url"]}]
1172 }
1173
1174 fields =
1175 data
1176 |> Map.get("attachment", [])
1177 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1178 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1179
1180 locked = data["manuallyApprovesFollowers"] || false
1181 data = Transmogrifier.maybe_fix_user_object(data)
1182 discoverable = data["discoverable"] || false
1183 invisible = data["invisible"] || false
1184
1185 user_data = %{
1186 ap_id: data["id"],
1187 ap_enabled: true,
1188 source_data: data,
1189 banner: banner,
1190 fields: fields,
1191 locked: locked,
1192 discoverable: discoverable,
1193 invisible: invisible,
1194 avatar: avatar,
1195 name: data["name"],
1196 follower_address: data["followers"],
1197 following_address: data["following"],
1198 bio: data["summary"],
1199 also_known_as: Map.get(data, "alsoKnownAs", [])
1200 }
1201
1202 # nickname can be nil because of virtual actors
1203 user_data =
1204 if data["preferredUsername"] do
1205 Map.put(
1206 user_data,
1207 :nickname,
1208 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1209 )
1210 else
1211 Map.put(user_data, :nickname, nil)
1212 end
1213
1214 {:ok, user_data}
1215 end
1216
1217 def fetch_follow_information_for_user(user) do
1218 with {:ok, following_data} <-
1219 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1220 following_count when is_integer(following_count) <- following_data["totalItems"],
1221 {:ok, hide_follows} <- collection_private(following_data),
1222 {:ok, followers_data} <-
1223 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1224 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1225 {:ok, hide_followers} <- collection_private(followers_data) do
1226 {:ok,
1227 %{
1228 hide_follows: hide_follows,
1229 follower_count: followers_count,
1230 following_count: following_count,
1231 hide_followers: hide_followers
1232 }}
1233 else
1234 {:error, _} = e ->
1235 e
1236
1237 e ->
1238 {:error, e}
1239 end
1240 end
1241
1242 defp maybe_update_follow_information(data) do
1243 with {:enabled, true} <-
1244 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1245 {:ok, info} <- fetch_follow_information_for_user(data) do
1246 info = Map.merge(data[:info] || %{}, info)
1247 Map.put(data, :info, info)
1248 else
1249 {:enabled, false} ->
1250 data
1251
1252 e ->
1253 Logger.error(
1254 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1255 )
1256
1257 data
1258 end
1259 end
1260
1261 defp collection_private(%{"first" => first}) do
1262 if is_map(first) and
1263 first["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1264 {:ok, false}
1265 else
1266 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1267 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1268 {:ok, false}
1269 else
1270 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1271 {:ok, true}
1272
1273 {:error, _} = e ->
1274 e
1275
1276 e ->
1277 {:error, e}
1278 end
1279 end
1280 end
1281
1282 defp collection_private(_data), do: {:ok, true}
1283
1284 def user_data_from_user_object(data) do
1285 with {:ok, data} <- MRF.filter(data),
1286 {:ok, data} <- object_to_user_data(data) do
1287 {:ok, data}
1288 else
1289 e -> {:error, e}
1290 end
1291 end
1292
1293 def fetch_and_prepare_user_from_ap_id(ap_id) do
1294 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1295 {:ok, data} <- user_data_from_user_object(data),
1296 data <- maybe_update_follow_information(data) do
1297 {:ok, data}
1298 else
1299 e ->
1300 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1301 {:error, e}
1302 end
1303 end
1304
1305 def make_user_from_ap_id(ap_id) do
1306 if _user = User.get_cached_by_ap_id(ap_id) do
1307 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1308 else
1309 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1310 User.insert_or_update_user(data)
1311 else
1312 e -> {:error, e}
1313 end
1314 end
1315 end
1316
1317 def make_user_from_nickname(nickname) do
1318 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1319 make_user_from_ap_id(ap_id)
1320 else
1321 _e -> {:error, "No AP id in WebFinger"}
1322 end
1323 end
1324
1325 # filter out broken threads
1326 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1327 entire_thread_visible_for_user?(activity, user)
1328 end
1329
1330 # do post-processing on a specific activity
1331 def contain_activity(%Activity{} = activity, %User{} = user) do
1332 contain_broken_threads(activity, user)
1333 end
1334
1335 def fetch_direct_messages_query do
1336 Activity
1337 |> restrict_type(%{"type" => "Create"})
1338 |> restrict_visibility(%{visibility: "direct"})
1339 |> order_by([activity], asc: activity.id)
1340 end
1341 end