Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into remake-remodel
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 # TODO rewrite in with style
128 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
129 def persist(object, meta) do
130 local = Keyword.fetch!(meta, :local)
131 {recipients, _, _} = get_recipients(object)
132
133 {:ok, activity} =
134 Repo.insert(%Activity{
135 data: object,
136 local: local,
137 recipients: recipients,
138 actor: object["actor"]
139 })
140
141 {:ok, activity, meta}
142 end
143
144 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
145 with nil <- Activity.normalize(map),
146 map <- lazy_put_activity_defaults(map, fake),
147 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
148 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
149 {:ok, map} <- MRF.filter(map),
150 {recipients, _, _} = get_recipients(map),
151 # ???
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
155 {:ok, activity} =
156 Repo.insert(%Activity{
157 data: map,
158 local: local,
159 actor: map["actor"],
160 recipients: recipients
161 })
162
163 # Splice in the child object if we have one.
164 activity =
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
167 else
168 activity
169 end
170
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
172
173 Notification.create_notifications(activity)
174
175 conversation = create_or_bump_conversation(activity, map["actor"])
176 participations = get_participations(conversation)
177 stream_out(activity)
178 stream_out_participations(participations)
179 {:ok, activity}
180 else
181 %Activity{} = activity ->
182 {:ok, activity}
183
184 {:fake, true, map, recipients} ->
185 activity = %Activity{
186 data: map,
187 local: local,
188 actor: map["actor"],
189 recipients: recipients,
190 id: "pleroma:fakeid"
191 }
192
193 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
194 {:ok, activity}
195
196 error ->
197 {:error, error}
198 end
199 end
200
201 defp create_or_bump_conversation(activity, actor) do
202 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
203 %User{} = user <- User.get_cached_by_ap_id(actor),
204 Participation.mark_as_read(user, conversation) do
205 {:ok, conversation}
206 end
207 end
208
209 defp get_participations({:ok, conversation}) do
210 conversation
211 |> Repo.preload(:participations, force: true)
212 |> Map.get(:participations)
213 end
214
215 defp get_participations(_), do: []
216
217 def stream_out_participations(participations) do
218 participations =
219 participations
220 |> Repo.preload(:user)
221
222 Streamer.stream("participation", participations)
223 end
224
225 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
226 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
227 conversation = Repo.preload(conversation, :participations),
228 last_activity_id =
229 fetch_latest_activity_id_for_context(conversation.ap_id, %{
230 "user" => user,
231 "blocking_user" => user
232 }) do
233 if last_activity_id do
234 stream_out_participations(conversation.participations)
235 end
236 end
237 end
238
239 def stream_out_participations(_, _), do: :noop
240
241 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
242 when data_type in ["Create", "Announce", "Delete"] do
243 activity
244 |> Topics.get_activity_topics()
245 |> Streamer.stream(activity)
246 end
247
248 def stream_out(_activity) do
249 :noop
250 end
251
252 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
258
259 with create_data <-
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 ),
264 {:ok, activity} <- insert(create_data, local, fake),
265 {:fake, false, activity} <- {:fake, fake, activity},
266 _ <- increase_replies_count_if_reply(create_data),
267 _ <- increase_poll_votes_if_vote(create_data),
268 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 :ok <- maybe_federate(activity) do
271 {:ok, activity}
272 else
273 {:quick_insert, true, activity} ->
274 {:ok, activity}
275
276 {:fake, true, activity} ->
277 {:ok, activity}
278
279 {:error, message} ->
280 {:error, message}
281 end
282 end
283
284 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
285 additional = params[:additional] || %{}
286 # only accept false as false value
287 local = !(params[:local] == false)
288 published = params[:published]
289
290 with listen_data <-
291 make_listen_data(
292 %{to: to, actor: actor, published: published, context: context, object: object},
293 additional
294 ),
295 {:ok, activity} <- insert(listen_data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 else
299 {:error, message} ->
300 {:error, message}
301 end
302 end
303
304 def accept(params) do
305 accept_or_reject("Accept", params)
306 end
307
308 def reject(params) do
309 accept_or_reject("Reject", params)
310 end
311
312 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
313 local = Map.get(params, :local, true)
314 activity_id = Map.get(params, :activity_id, nil)
315
316 with data <-
317 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
318 |> Utils.maybe_put("id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
328
329 with data <- %{
330 "to" => to,
331 "cc" => cc,
332 "type" => "Update",
333 "actor" => actor,
334 "object" => object
335 },
336 data <- Utils.maybe_put(data, "id", activity_id),
337 {:ok, activity} <- insert(data, local),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity}
340 end
341 end
342
343 def react_with_emoji(user, object, emoji, options \\ []) do
344 with local <- Keyword.get(options, :local, true),
345 activity_id <- Keyword.get(options, :activity_id, nil),
346 Pleroma.Emoji.is_unicode_emoji?(emoji),
347 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
348 {:ok, activity} <- insert(reaction_data, local),
349 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
350 :ok <- maybe_federate(activity) do
351 {:ok, activity, object}
352 end
353 end
354
355 def unreact_with_emoji(user, reaction_id, options \\ []) do
356 with local <- Keyword.get(options, :local, true),
357 activity_id <- Keyword.get(options, :activity_id, nil),
358 user_ap_id <- user.ap_id,
359 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
360 object <- Object.normalize(reaction_activity),
361 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
362 {:ok, activity} <- insert(unreact_data, local),
363 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
364 :ok <- maybe_federate(activity) do
365 {:ok, activity, object}
366 end
367 end
368
369 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
370 def like(
371 %User{ap_id: ap_id} = user,
372 %Object{data: %{"id" => _}} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with nil <- get_existing_like(ap_id, object),
377 like_data <- make_like_data(user, object, activity_id),
378 {:ok, activity} <- insert(like_data, local),
379 {:ok, object} <- add_like_to_object(activity, object),
380 :ok <- maybe_federate(activity) do
381 {:ok, activity, object}
382 else
383 %Activity{} = activity -> {:ok, activity, object}
384 error -> {:error, error}
385 end
386 end
387
388 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
389 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
390 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
391 {:ok, unlike_activity} <- insert(unlike_data, local),
392 {:ok, _activity} <- Repo.delete(like_activity),
393 {:ok, object} <- remove_like_from_object(like_activity, object),
394 :ok <- maybe_federate(unlike_activity) do
395 {:ok, unlike_activity, like_activity, object}
396 else
397 _e -> {:ok, object}
398 end
399 end
400
401 def announce(
402 %User{ap_id: _} = user,
403 %Object{data: %{"id" => _}} = object,
404 activity_id \\ nil,
405 local \\ true,
406 public \\ true
407 ) do
408 with true <- is_announceable?(object, user, public),
409 announce_data <- make_announce_data(user, object, activity_id, public),
410 {:ok, activity} <- insert(announce_data, local),
411 {:ok, object} <- add_announce_to_object(activity, object),
412 :ok <- maybe_federate(activity) do
413 {:ok, activity, object}
414 else
415 error -> {:error, error}
416 end
417 end
418
419 def unannounce(
420 %User{} = actor,
421 %Object{} = object,
422 activity_id \\ nil,
423 local \\ true
424 ) do
425 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
426 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
427 {:ok, unannounce_activity} <- insert(unannounce_data, local),
428 :ok <- maybe_federate(unannounce_activity),
429 {:ok, _activity} <- Repo.delete(announce_activity),
430 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
431 {:ok, unannounce_activity, object}
432 else
433 _e -> {:ok, object}
434 end
435 end
436
437 def follow(follower, followed, activity_id \\ nil, local \\ true) do
438 with data <- make_follow_data(follower, followed, activity_id),
439 {:ok, activity} <- insert(data, local),
440 :ok <- maybe_federate(activity),
441 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
442 {:ok, activity}
443 end
444 end
445
446 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
447 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
448 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
449 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
450 {:ok, activity} <- insert(unfollow_data, local),
451 :ok <- maybe_federate(activity) do
452 {:ok, activity}
453 end
454 end
455
456 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
457 with data <- %{
458 "to" => [follower_address],
459 "type" => "Delete",
460 "actor" => ap_id,
461 "object" => %{"type" => "Person", "id" => ap_id}
462 },
463 {:ok, activity} <- insert(data, true, true, true),
464 :ok <- maybe_federate(activity) do
465 {:ok, user}
466 end
467 end
468
469 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
470 local = Keyword.get(options, :local, true)
471 activity_id = Keyword.get(options, :activity_id, nil)
472 actor = Keyword.get(options, :actor, actor)
473
474 user = User.get_cached_by_ap_id(actor)
475 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
476
477 with {:ok, object, activity} <- Object.delete(object),
478 data <-
479 %{
480 "type" => "Delete",
481 "actor" => actor,
482 "object" => id,
483 "to" => to,
484 "deleted_activity_id" => activity && activity.id
485 }
486 |> maybe_put("id", activity_id),
487 {:ok, activity} <- insert(data, local, false),
488 stream_out_participations(object, user),
489 _ <- decrease_replies_count_if_reply(object),
490 {:ok, _actor} <- decrease_note_count_if_public(user, object),
491 :ok <- maybe_federate(activity) do
492 {:ok, activity}
493 end
494 end
495
496 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
497 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
498 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
499 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
500
501 if unfollow_blocked do
502 follow_activity = fetch_latest_follow(blocker, blocked)
503 if follow_activity, do: unfollow(blocker, blocked, nil, local)
504 end
505
506 with true <- outgoing_blocks,
507 block_data <- make_block_data(blocker, blocked, activity_id),
508 {:ok, activity} <- insert(block_data, local),
509 :ok <- maybe_federate(activity) do
510 {:ok, activity}
511 else
512 _e -> {:ok, nil}
513 end
514 end
515
516 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
517 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
518 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
519 {:ok, activity} <- insert(unblock_data, local),
520 :ok <- maybe_federate(activity) do
521 {:ok, activity}
522 end
523 end
524
525 @spec flag(map()) :: {:ok, Activity.t()} | any
526 def flag(
527 %{
528 actor: actor,
529 context: _context,
530 account: account,
531 statuses: statuses,
532 content: content
533 } = params
534 ) do
535 # only accept false as false value
536 local = !(params[:local] == false)
537 forward = !(params[:forward] == false)
538
539 additional = params[:additional] || %{}
540
541 additional =
542 if forward do
543 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
544 else
545 Map.merge(additional, %{"to" => [], "cc" => []})
546 end
547
548 with flag_data <- make_flag_data(params, additional),
549 {:ok, activity} <- insert(flag_data, local),
550 {:ok, stripped_activity} <- strip_report_status_data(activity),
551 :ok <- maybe_federate(stripped_activity) do
552 Enum.each(User.all_superusers(), fn superuser ->
553 superuser
554 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
555 |> Pleroma.Emails.Mailer.deliver_async()
556 end)
557
558 {:ok, activity}
559 end
560 end
561
562 def move(%User{} = origin, %User{} = target, local \\ true) do
563 params = %{
564 "type" => "Move",
565 "actor" => origin.ap_id,
566 "object" => origin.ap_id,
567 "target" => target.ap_id
568 }
569
570 with true <- origin.ap_id in target.also_known_as,
571 {:ok, activity} <- insert(params, local) do
572 maybe_federate(activity)
573
574 BackgroundWorker.enqueue("move_following", %{
575 "origin_id" => origin.id,
576 "target_id" => target.id
577 })
578
579 {:ok, activity}
580 else
581 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
582 err -> err
583 end
584 end
585
586 defp fetch_activities_for_context_query(context, opts) do
587 public = [Pleroma.Constants.as_public()]
588
589 recipients =
590 if opts["user"],
591 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
592 else: public
593
594 from(activity in Activity)
595 |> maybe_preload_objects(opts)
596 |> maybe_preload_bookmarks(opts)
597 |> maybe_set_thread_muted_field(opts)
598 |> restrict_blocked(opts)
599 |> restrict_recipients(recipients, opts["user"])
600 |> where(
601 [activity],
602 fragment(
603 "?->>'type' = ? and ?->>'context' = ?",
604 activity.data,
605 "Create",
606 activity.data,
607 ^context
608 )
609 )
610 |> exclude_poll_votes(opts)
611 |> exclude_id(opts)
612 |> order_by([activity], desc: activity.id)
613 end
614
615 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
616 def fetch_activities_for_context(context, opts \\ %{}) do
617 context
618 |> fetch_activities_for_context_query(opts)
619 |> Repo.all()
620 end
621
622 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
623 FlakeId.Ecto.CompatType.t() | nil
624 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
625 context
626 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
627 |> limit(1)
628 |> select([a], a.id)
629 |> Repo.one()
630 end
631
632 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
633 opts = Map.drop(opts, ["user"])
634
635 [Pleroma.Constants.as_public()]
636 |> fetch_activities_query(opts)
637 |> restrict_unlisted()
638 |> Pagination.fetch_paginated(opts, pagination)
639 end
640
641 @valid_visibilities ~w[direct unlisted public private]
642
643 defp restrict_visibility(query, %{visibility: visibility})
644 when is_list(visibility) do
645 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
646 query =
647 from(
648 a in query,
649 where:
650 fragment(
651 "activity_visibility(?, ?, ?) = ANY (?)",
652 a.actor,
653 a.recipients,
654 a.data,
655 ^visibility
656 )
657 )
658
659 query
660 else
661 Logger.error("Could not restrict visibility to #{visibility}")
662 end
663 end
664
665 defp restrict_visibility(query, %{visibility: visibility})
666 when visibility in @valid_visibilities do
667 from(
668 a in query,
669 where:
670 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
671 )
672 end
673
674 defp restrict_visibility(_query, %{visibility: visibility})
675 when visibility not in @valid_visibilities do
676 Logger.error("Could not restrict visibility to #{visibility}")
677 end
678
679 defp restrict_visibility(query, _visibility), do: query
680
681 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
682 when is_list(visibility) do
683 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
684 from(
685 a in query,
686 where:
687 not fragment(
688 "activity_visibility(?, ?, ?) = ANY (?)",
689 a.actor,
690 a.recipients,
691 a.data,
692 ^visibility
693 )
694 )
695 else
696 Logger.error("Could not exclude visibility to #{visibility}")
697 query
698 end
699 end
700
701 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
702 when visibility in @valid_visibilities do
703 from(
704 a in query,
705 where:
706 not fragment(
707 "activity_visibility(?, ?, ?) = ?",
708 a.actor,
709 a.recipients,
710 a.data,
711 ^visibility
712 )
713 )
714 end
715
716 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
717 when visibility not in @valid_visibilities do
718 Logger.error("Could not exclude visibility to #{visibility}")
719 query
720 end
721
722 defp exclude_visibility(query, _visibility), do: query
723
724 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
725 do: query
726
727 defp restrict_thread_visibility(
728 query,
729 %{"user" => %User{skip_thread_containment: true}},
730 _
731 ),
732 do: query
733
734 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
735 from(
736 a in query,
737 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
738 )
739 end
740
741 defp restrict_thread_visibility(query, _, _), do: query
742
743 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
744 params =
745 params
746 |> Map.put("user", reading_user)
747 |> Map.put("actor_id", user.ap_id)
748 |> Map.put("whole_db", true)
749
750 recipients =
751 user_activities_recipients(%{
752 "godmode" => params["godmode"],
753 "reading_user" => reading_user
754 })
755
756 fetch_activities(recipients, params)
757 |> Enum.reverse()
758 end
759
760 def fetch_user_activities(user, reading_user, params \\ %{}) do
761 params =
762 params
763 |> Map.put("type", ["Create", "Announce"])
764 |> Map.put("user", reading_user)
765 |> Map.put("actor_id", user.ap_id)
766 |> Map.put("whole_db", true)
767 |> Map.put("pinned_activity_ids", user.pinned_activities)
768
769 recipients =
770 user_activities_recipients(%{
771 "godmode" => params["godmode"],
772 "reading_user" => reading_user
773 })
774
775 fetch_activities(recipients, params)
776 |> Enum.reverse()
777 end
778
779 def fetch_instance_activities(params) do
780 params =
781 params
782 |> Map.put("type", ["Create", "Announce"])
783 |> Map.put("instance", params["instance"])
784 |> Map.put("whole_db", true)
785
786 fetch_activities([Pleroma.Constants.as_public()], params, :offset)
787 |> Enum.reverse()
788 end
789
790 defp user_activities_recipients(%{"godmode" => true}) do
791 []
792 end
793
794 defp user_activities_recipients(%{"reading_user" => reading_user}) do
795 if reading_user do
796 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
797 else
798 [Pleroma.Constants.as_public()]
799 end
800 end
801
802 defp restrict_since(query, %{"since_id" => ""}), do: query
803
804 defp restrict_since(query, %{"since_id" => since_id}) do
805 from(activity in query, where: activity.id > ^since_id)
806 end
807
808 defp restrict_since(query, _), do: query
809
810 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
811 raise "Can't use the child object without preloading!"
812 end
813
814 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
815 when is_list(tag_reject) and tag_reject != [] do
816 from(
817 [_activity, object] in query,
818 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
819 )
820 end
821
822 defp restrict_tag_reject(query, _), do: query
823
824 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
825 raise "Can't use the child object without preloading!"
826 end
827
828 defp restrict_tag_all(query, %{"tag_all" => tag_all})
829 when is_list(tag_all) and tag_all != [] do
830 from(
831 [_activity, object] in query,
832 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
833 )
834 end
835
836 defp restrict_tag_all(query, _), do: query
837
838 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
839 raise "Can't use the child object without preloading!"
840 end
841
842 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
843 from(
844 [_activity, object] in query,
845 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
846 )
847 end
848
849 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
850 from(
851 [_activity, object] in query,
852 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
853 )
854 end
855
856 defp restrict_tag(query, _), do: query
857
858 defp restrict_recipients(query, [], _user), do: query
859
860 defp restrict_recipients(query, recipients, nil) do
861 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
862 end
863
864 defp restrict_recipients(query, recipients, user) do
865 from(
866 activity in query,
867 where: fragment("? && ?", ^recipients, activity.recipients),
868 or_where: activity.actor == ^user.ap_id
869 )
870 end
871
872 defp restrict_local(query, %{"local_only" => true}) do
873 from(activity in query, where: activity.local == true)
874 end
875
876 defp restrict_local(query, _), do: query
877
878 defp restrict_actor(query, %{"actor_id" => actor_id}) do
879 from(activity in query, where: activity.actor == ^actor_id)
880 end
881
882 defp restrict_actor(query, _), do: query
883
884 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
885 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
886 end
887
888 defp restrict_type(query, %{"type" => type}) do
889 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
890 end
891
892 defp restrict_type(query, _), do: query
893
894 defp restrict_state(query, %{"state" => state}) do
895 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
896 end
897
898 defp restrict_state(query, _), do: query
899
900 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
901 from(
902 [_activity, object] in query,
903 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
904 )
905 end
906
907 defp restrict_favorited_by(query, _), do: query
908
909 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
910 raise "Can't use the child object without preloading!"
911 end
912
913 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
914 from(
915 [_activity, object] in query,
916 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
917 )
918 end
919
920 defp restrict_media(query, _), do: query
921
922 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
923 from(
924 [_activity, object] in query,
925 where: fragment("?->>'inReplyTo' is null", object.data)
926 )
927 end
928
929 defp restrict_replies(query, _), do: query
930
931 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
932 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
933 end
934
935 defp restrict_reblogs(query, _), do: query
936
937 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
938
939 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
940 mutes = user.mutes
941
942 query =
943 from([activity] in query,
944 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
945 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
946 )
947
948 unless opts["skip_preload"] do
949 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
950 else
951 query
952 end
953 end
954
955 defp restrict_muted(query, _), do: query
956
957 defp restrict_blocked(query, %{"blocking_user" => %User{} = user}) do
958 blocks = user.blocks || []
959 domain_blocks = user.domain_blocks || []
960
961 query =
962 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
963
964 from(
965 [activity, object: o] in query,
966 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
967 where: fragment("not (? && ?)", activity.recipients, ^blocks),
968 where:
969 fragment(
970 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
971 activity.data,
972 activity.data,
973 ^blocks
974 ),
975 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
976 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
977 )
978 end
979
980 defp restrict_blocked(query, _), do: query
981
982 defp restrict_unlisted(query) do
983 from(
984 activity in query,
985 where:
986 fragment(
987 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
988 activity.data,
989 ^[Pleroma.Constants.as_public()]
990 )
991 )
992 end
993
994 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
995 from(activity in query, where: activity.id in ^ids)
996 end
997
998 defp restrict_pinned(query, _), do: query
999
1000 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user}) do
1001 muted_reblogs = user.muted_reblogs || []
1002
1003 from(
1004 activity in query,
1005 where:
1006 fragment(
1007 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1008 activity.data,
1009 activity.actor,
1010 ^muted_reblogs
1011 )
1012 )
1013 end
1014
1015 defp restrict_muted_reblogs(query, _), do: query
1016
1017 defp restrict_instance(query, %{"instance" => instance}) do
1018 users =
1019 from(
1020 u in User,
1021 select: u.ap_id,
1022 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1023 )
1024 |> Repo.all()
1025
1026 from(activity in query, where: activity.actor in ^users)
1027 end
1028
1029 defp restrict_instance(query, _), do: query
1030
1031 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1032
1033 defp exclude_poll_votes(query, _) do
1034 if has_named_binding?(query, :object) do
1035 from([activity, object: o] in query,
1036 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1037 )
1038 else
1039 query
1040 end
1041 end
1042
1043 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1044 from(activity in query, where: activity.id != ^id)
1045 end
1046
1047 defp exclude_id(query, _), do: query
1048
1049 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1050
1051 defp maybe_preload_objects(query, _) do
1052 query
1053 |> Activity.with_preloaded_object()
1054 end
1055
1056 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1057
1058 defp maybe_preload_bookmarks(query, opts) do
1059 query
1060 |> Activity.with_preloaded_bookmark(opts["user"])
1061 end
1062
1063 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1064
1065 defp maybe_set_thread_muted_field(query, opts) do
1066 query
1067 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1068 end
1069
1070 defp maybe_order(query, %{order: :desc}) do
1071 query
1072 |> order_by(desc: :id)
1073 end
1074
1075 defp maybe_order(query, %{order: :asc}) do
1076 query
1077 |> order_by(asc: :id)
1078 end
1079
1080 defp maybe_order(query, _), do: query
1081
1082 def fetch_activities_query(recipients, opts \\ %{}) do
1083 config = %{
1084 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1085 }
1086
1087 Activity
1088 |> maybe_preload_objects(opts)
1089 |> maybe_preload_bookmarks(opts)
1090 |> maybe_set_thread_muted_field(opts)
1091 |> maybe_order(opts)
1092 |> restrict_recipients(recipients, opts["user"])
1093 |> restrict_tag(opts)
1094 |> restrict_tag_reject(opts)
1095 |> restrict_tag_all(opts)
1096 |> restrict_since(opts)
1097 |> restrict_local(opts)
1098 |> restrict_actor(opts)
1099 |> restrict_type(opts)
1100 |> restrict_state(opts)
1101 |> restrict_favorited_by(opts)
1102 |> restrict_blocked(opts)
1103 |> restrict_muted(opts)
1104 |> restrict_media(opts)
1105 |> restrict_visibility(opts)
1106 |> restrict_thread_visibility(opts, config)
1107 |> restrict_replies(opts)
1108 |> restrict_reblogs(opts)
1109 |> restrict_pinned(opts)
1110 |> restrict_muted_reblogs(opts)
1111 |> restrict_instance(opts)
1112 |> Activity.restrict_deactivated_users()
1113 |> exclude_poll_votes(opts)
1114 |> exclude_visibility(opts)
1115 end
1116
1117 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1118 list_memberships = Pleroma.List.memberships(opts["user"])
1119
1120 fetch_activities_query(recipients ++ list_memberships, opts)
1121 |> Pagination.fetch_paginated(opts, pagination)
1122 |> Enum.reverse()
1123 |> maybe_update_cc(list_memberships, opts["user"])
1124 end
1125
1126 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1127 when is_list(list_memberships) and length(list_memberships) > 0 do
1128 Enum.map(activities, fn
1129 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1130 if Enum.any?(bcc, &(&1 in list_memberships)) do
1131 update_in(activity.data["cc"], &[user_ap_id | &1])
1132 else
1133 activity
1134 end
1135
1136 activity ->
1137 activity
1138 end)
1139 end
1140
1141 defp maybe_update_cc(activities, _, _), do: activities
1142
1143 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1144 from(activity in query,
1145 where:
1146 fragment("? && ?", activity.recipients, ^recipients) or
1147 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1148 ^Pleroma.Constants.as_public() in activity.recipients)
1149 )
1150 end
1151
1152 def fetch_activities_bounded(
1153 recipients,
1154 recipients_with_public,
1155 opts \\ %{},
1156 pagination \\ :keyset
1157 ) do
1158 fetch_activities_query([], opts)
1159 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1160 |> Pagination.fetch_paginated(opts, pagination)
1161 |> Enum.reverse()
1162 end
1163
1164 def upload(file, opts \\ []) do
1165 with {:ok, data} <- Upload.store(file, opts) do
1166 obj_data =
1167 if opts[:actor] do
1168 Map.put(data, "actor", opts[:actor])
1169 else
1170 data
1171 end
1172
1173 Repo.insert(%Object{data: obj_data})
1174 end
1175 end
1176
1177 defp object_to_user_data(data) do
1178 avatar =
1179 data["icon"]["url"] &&
1180 %{
1181 "type" => "Image",
1182 "url" => [%{"href" => data["icon"]["url"]}]
1183 }
1184
1185 banner =
1186 data["image"]["url"] &&
1187 %{
1188 "type" => "Image",
1189 "url" => [%{"href" => data["image"]["url"]}]
1190 }
1191
1192 fields =
1193 data
1194 |> Map.get("attachment", [])
1195 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1196 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1197
1198 locked = data["manuallyApprovesFollowers"] || false
1199 data = Transmogrifier.maybe_fix_user_object(data)
1200 discoverable = data["discoverable"] || false
1201 invisible = data["invisible"] || false
1202
1203 user_data = %{
1204 ap_id: data["id"],
1205 ap_enabled: true,
1206 source_data: data,
1207 banner: banner,
1208 fields: fields,
1209 locked: locked,
1210 discoverable: discoverable,
1211 invisible: invisible,
1212 avatar: avatar,
1213 name: data["name"],
1214 follower_address: data["followers"],
1215 following_address: data["following"],
1216 bio: data["summary"],
1217 also_known_as: Map.get(data, "alsoKnownAs", [])
1218 }
1219
1220 # nickname can be nil because of virtual actors
1221 user_data =
1222 if data["preferredUsername"] do
1223 Map.put(
1224 user_data,
1225 :nickname,
1226 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1227 )
1228 else
1229 Map.put(user_data, :nickname, nil)
1230 end
1231
1232 {:ok, user_data}
1233 end
1234
1235 def fetch_follow_information_for_user(user) do
1236 with {:ok, following_data} <-
1237 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1238 following_count when is_integer(following_count) <- following_data["totalItems"],
1239 {:ok, hide_follows} <- collection_private(following_data),
1240 {:ok, followers_data} <-
1241 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1242 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1243 {:ok, hide_followers} <- collection_private(followers_data) do
1244 {:ok,
1245 %{
1246 hide_follows: hide_follows,
1247 follower_count: followers_count,
1248 following_count: following_count,
1249 hide_followers: hide_followers
1250 }}
1251 else
1252 {:error, _} = e ->
1253 e
1254
1255 e ->
1256 {:error, e}
1257 end
1258 end
1259
1260 defp maybe_update_follow_information(data) do
1261 with {:enabled, true} <-
1262 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1263 {:ok, info} <- fetch_follow_information_for_user(data) do
1264 info = Map.merge(data[:info] || %{}, info)
1265 Map.put(data, :info, info)
1266 else
1267 {:enabled, false} ->
1268 data
1269
1270 e ->
1271 Logger.error(
1272 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1273 )
1274
1275 data
1276 end
1277 end
1278
1279 defp collection_private(%{"first" => first}) do
1280 if is_map(first) and
1281 first["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1282 {:ok, false}
1283 else
1284 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1285 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1286 {:ok, false}
1287 else
1288 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1289 {:ok, true}
1290
1291 {:error, _} = e ->
1292 e
1293
1294 e ->
1295 {:error, e}
1296 end
1297 end
1298 end
1299
1300 defp collection_private(_data), do: {:ok, true}
1301
1302 def user_data_from_user_object(data) do
1303 with {:ok, data} <- MRF.filter(data),
1304 {:ok, data} <- object_to_user_data(data) do
1305 {:ok, data}
1306 else
1307 e -> {:error, e}
1308 end
1309 end
1310
1311 def fetch_and_prepare_user_from_ap_id(ap_id) do
1312 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1313 {:ok, data} <- user_data_from_user_object(data),
1314 data <- maybe_update_follow_information(data) do
1315 {:ok, data}
1316 else
1317 e ->
1318 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1319 {:error, e}
1320 end
1321 end
1322
1323 def make_user_from_ap_id(ap_id) do
1324 if _user = User.get_cached_by_ap_id(ap_id) do
1325 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1326 else
1327 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1328 User.insert_or_update_user(data)
1329 else
1330 e -> {:error, e}
1331 end
1332 end
1333 end
1334
1335 def make_user_from_nickname(nickname) do
1336 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1337 make_user_from_ap_id(ap_id)
1338 else
1339 _e -> {:error, "No AP id in WebFinger"}
1340 end
1341 end
1342
1343 # filter out broken threads
1344 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1345 entire_thread_visible_for_user?(activity, user)
1346 end
1347
1348 # do post-processing on a specific activity
1349 def contain_activity(%Activity{} = activity, %User{} = user) do
1350 contain_broken_threads(activity, user)
1351 end
1352
1353 def fetch_direct_messages_query do
1354 Activity
1355 |> restrict_type(%{"type" => "Create"})
1356 |> restrict_visibility(%{visibility: "direct"})
1357 |> order_by([activity], asc: activity.id)
1358 end
1359 end