72a29e50ffc5460043a4b6410c8d1d7b9bced1c8
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 # TODO rewrite in with style
128 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
129 def persist(object, meta) do
130 local = Keyword.fetch!(meta, :local)
131 {recipients, _, _} = get_recipients(object)
132
133 {:ok, activity} =
134 Repo.insert(%Activity{
135 data: object,
136 local: local,
137 recipients: recipients,
138 actor: object["actor"]
139 })
140
141 {:ok, activity, meta}
142 end
143
144 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
145 with nil <- Activity.normalize(map),
146 map <- lazy_put_activity_defaults(map, fake),
147 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
148 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
149 {:ok, map} <- MRF.filter(map),
150 {recipients, _, _} = get_recipients(map),
151 # ???
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
155 {:ok, activity} =
156 Repo.insert(%Activity{
157 data: map,
158 local: local,
159 actor: map["actor"],
160 recipients: recipients
161 })
162
163 # Splice in the child object if we have one.
164 activity =
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
167 else
168 activity
169 end
170
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
172
173 Notification.create_notifications(activity)
174
175 conversation = create_or_bump_conversation(activity, map["actor"])
176 participations = get_participations(conversation)
177 stream_out(activity)
178 stream_out_participations(participations)
179 {:ok, activity}
180 else
181 %Activity{} = activity ->
182 {:ok, activity}
183
184 {:fake, true, map, recipients} ->
185 activity = %Activity{
186 data: map,
187 local: local,
188 actor: map["actor"],
189 recipients: recipients,
190 id: "pleroma:fakeid"
191 }
192
193 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
194 {:ok, activity}
195
196 error ->
197 {:error, error}
198 end
199 end
200
201 defp create_or_bump_conversation(activity, actor) do
202 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
203 %User{} = user <- User.get_cached_by_ap_id(actor),
204 Participation.mark_as_read(user, conversation) do
205 {:ok, conversation}
206 end
207 end
208
209 defp get_participations({:ok, conversation}) do
210 conversation
211 |> Repo.preload(:participations, force: true)
212 |> Map.get(:participations)
213 end
214
215 defp get_participations(_), do: []
216
217 def stream_out_participations(participations) do
218 participations =
219 participations
220 |> Repo.preload(:user)
221
222 Streamer.stream("participation", participations)
223 end
224
225 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
226 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
227 conversation = Repo.preload(conversation, :participations),
228 last_activity_id =
229 fetch_latest_activity_id_for_context(conversation.ap_id, %{
230 "user" => user,
231 "blocking_user" => user
232 }) do
233 if last_activity_id do
234 stream_out_participations(conversation.participations)
235 end
236 end
237 end
238
239 def stream_out_participations(_, _), do: :noop
240
241 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
242 when data_type in ["Create", "Announce", "Delete"] do
243 activity
244 |> Topics.get_activity_topics()
245 |> Streamer.stream(activity)
246 end
247
248 def stream_out(_activity) do
249 :noop
250 end
251
252 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
258
259 with create_data <-
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 ),
264 {:ok, activity} <- insert(create_data, local, fake),
265 {:fake, false, activity} <- {:fake, fake, activity},
266 _ <- increase_replies_count_if_reply(create_data),
267 _ <- increase_poll_votes_if_vote(create_data),
268 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 :ok <- maybe_federate(activity) do
271 {:ok, activity}
272 else
273 {:quick_insert, true, activity} ->
274 {:ok, activity}
275
276 {:fake, true, activity} ->
277 {:ok, activity}
278
279 {:error, message} ->
280 {:error, message}
281 end
282 end
283
284 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
285 additional = params[:additional] || %{}
286 # only accept false as false value
287 local = !(params[:local] == false)
288 published = params[:published]
289
290 with listen_data <-
291 make_listen_data(
292 %{to: to, actor: actor, published: published, context: context, object: object},
293 additional
294 ),
295 {:ok, activity} <- insert(listen_data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 else
299 {:error, message} ->
300 {:error, message}
301 end
302 end
303
304 def accept(params) do
305 accept_or_reject("Accept", params)
306 end
307
308 def reject(params) do
309 accept_or_reject("Reject", params)
310 end
311
312 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
313 local = Map.get(params, :local, true)
314 activity_id = Map.get(params, :activity_id, nil)
315
316 with data <-
317 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
318 |> Utils.maybe_put("id", activity_id),
319 {:ok, activity} <- insert(data, local),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
328
329 with data <- %{
330 "to" => to,
331 "cc" => cc,
332 "type" => "Update",
333 "actor" => actor,
334 "object" => object
335 },
336 data <- Utils.maybe_put(data, "id", activity_id),
337 {:ok, activity} <- insert(data, local),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity}
340 end
341 end
342
343 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
344 def like(
345 %User{ap_id: ap_id} = user,
346 %Object{data: %{"id" => _}} = object,
347 activity_id \\ nil,
348 local \\ true
349 ) do
350 with nil <- get_existing_like(ap_id, object),
351 like_data <- make_like_data(user, object, activity_id),
352 {:ok, activity} <- insert(like_data, local),
353 {:ok, object} <- add_like_to_object(activity, object),
354 :ok <- maybe_federate(activity) do
355 {:ok, activity, object}
356 else
357 %Activity{} = activity -> {:ok, activity, object}
358 error -> {:error, error}
359 end
360 end
361
362 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
363 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
364 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
365 {:ok, unlike_activity} <- insert(unlike_data, local),
366 {:ok, _activity} <- Repo.delete(like_activity),
367 {:ok, object} <- remove_like_from_object(like_activity, object),
368 :ok <- maybe_federate(unlike_activity) do
369 {:ok, unlike_activity, like_activity, object}
370 else
371 _e -> {:ok, object}
372 end
373 end
374
375 def announce(
376 %User{ap_id: _} = user,
377 %Object{data: %{"id" => _}} = object,
378 activity_id \\ nil,
379 local \\ true,
380 public \\ true
381 ) do
382 with true <- is_announceable?(object, user, public),
383 announce_data <- make_announce_data(user, object, activity_id, public),
384 {:ok, activity} <- insert(announce_data, local),
385 {:ok, object} <- add_announce_to_object(activity, object),
386 :ok <- maybe_federate(activity) do
387 {:ok, activity, object}
388 else
389 error -> {:error, error}
390 end
391 end
392
393 def unannounce(
394 %User{} = actor,
395 %Object{} = object,
396 activity_id \\ nil,
397 local \\ true
398 ) do
399 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
400 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
401 {:ok, unannounce_activity} <- insert(unannounce_data, local),
402 :ok <- maybe_federate(unannounce_activity),
403 {:ok, _activity} <- Repo.delete(announce_activity),
404 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
405 {:ok, unannounce_activity, object}
406 else
407 _e -> {:ok, object}
408 end
409 end
410
411 def follow(follower, followed, activity_id \\ nil, local \\ true) do
412 with data <- make_follow_data(follower, followed, activity_id),
413 {:ok, activity} <- insert(data, local),
414 :ok <- maybe_federate(activity),
415 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
416 {:ok, activity}
417 end
418 end
419
420 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
421 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
422 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
423 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
424 {:ok, activity} <- insert(unfollow_data, local),
425 :ok <- maybe_federate(activity) do
426 {:ok, activity}
427 end
428 end
429
430 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
431 with data <- %{
432 "to" => [follower_address],
433 "type" => "Delete",
434 "actor" => ap_id,
435 "object" => %{"type" => "Person", "id" => ap_id}
436 },
437 {:ok, activity} <- insert(data, true, true, true),
438 :ok <- maybe_federate(activity) do
439 {:ok, user}
440 end
441 end
442
443 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
444 local = Keyword.get(options, :local, true)
445 activity_id = Keyword.get(options, :activity_id, nil)
446 actor = Keyword.get(options, :actor, actor)
447
448 user = User.get_cached_by_ap_id(actor)
449 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
450
451 with {:ok, object, activity} <- Object.delete(object),
452 data <-
453 %{
454 "type" => "Delete",
455 "actor" => actor,
456 "object" => id,
457 "to" => to,
458 "deleted_activity_id" => activity && activity.id
459 }
460 |> maybe_put("id", activity_id),
461 {:ok, activity} <- insert(data, local, false),
462 stream_out_participations(object, user),
463 _ <- decrease_replies_count_if_reply(object),
464 {:ok, _actor} <- decrease_note_count_if_public(user, object),
465 :ok <- maybe_federate(activity) do
466 {:ok, activity}
467 end
468 end
469
470 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
471 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
472 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
473 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
474
475 if unfollow_blocked do
476 follow_activity = fetch_latest_follow(blocker, blocked)
477 if follow_activity, do: unfollow(blocker, blocked, nil, local)
478 end
479
480 with true <- outgoing_blocks,
481 block_data <- make_block_data(blocker, blocked, activity_id),
482 {:ok, activity} <- insert(block_data, local),
483 :ok <- maybe_federate(activity) do
484 {:ok, activity}
485 else
486 _e -> {:ok, nil}
487 end
488 end
489
490 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
491 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
492 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
493 {:ok, activity} <- insert(unblock_data, local),
494 :ok <- maybe_federate(activity) do
495 {:ok, activity}
496 end
497 end
498
499 @spec flag(map()) :: {:ok, Activity.t()} | any
500 def flag(
501 %{
502 actor: actor,
503 context: _context,
504 account: account,
505 statuses: statuses,
506 content: content
507 } = params
508 ) do
509 # only accept false as false value
510 local = !(params[:local] == false)
511 forward = !(params[:forward] == false)
512
513 additional = params[:additional] || %{}
514
515 additional =
516 if forward do
517 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
518 else
519 Map.merge(additional, %{"to" => [], "cc" => []})
520 end
521
522 with flag_data <- make_flag_data(params, additional),
523 {:ok, activity} <- insert(flag_data, local),
524 {:ok, stripped_activity} <- strip_report_status_data(activity),
525 :ok <- maybe_federate(stripped_activity) do
526 Enum.each(User.all_superusers(), fn superuser ->
527 superuser
528 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
529 |> Pleroma.Emails.Mailer.deliver_async()
530 end)
531
532 {:ok, activity}
533 end
534 end
535
536 defp fetch_activities_for_context_query(context, opts) do
537 public = [Pleroma.Constants.as_public()]
538
539 recipients =
540 if opts["user"],
541 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
542 else: public
543
544 from(activity in Activity)
545 |> maybe_preload_objects(opts)
546 |> maybe_preload_bookmarks(opts)
547 |> maybe_set_thread_muted_field(opts)
548 |> restrict_blocked(opts)
549 |> restrict_recipients(recipients, opts["user"])
550 |> where(
551 [activity],
552 fragment(
553 "?->>'type' = ? and ?->>'context' = ?",
554 activity.data,
555 "Create",
556 activity.data,
557 ^context
558 )
559 )
560 |> exclude_poll_votes(opts)
561 |> exclude_id(opts)
562 |> order_by([activity], desc: activity.id)
563 end
564
565 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
566 def fetch_activities_for_context(context, opts \\ %{}) do
567 context
568 |> fetch_activities_for_context_query(opts)
569 |> Repo.all()
570 end
571
572 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
573 FlakeId.Ecto.CompatType.t() | nil
574 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
575 context
576 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
577 |> limit(1)
578 |> select([a], a.id)
579 |> Repo.one()
580 end
581
582 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
583 opts = Map.drop(opts, ["user"])
584
585 [Pleroma.Constants.as_public()]
586 |> fetch_activities_query(opts)
587 |> restrict_unlisted()
588 |> Pagination.fetch_paginated(opts, pagination)
589 |> Enum.reverse()
590 end
591
592 @valid_visibilities ~w[direct unlisted public private]
593
594 defp restrict_visibility(query, %{visibility: visibility})
595 when is_list(visibility) do
596 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
597 query =
598 from(
599 a in query,
600 where:
601 fragment(
602 "activity_visibility(?, ?, ?) = ANY (?)",
603 a.actor,
604 a.recipients,
605 a.data,
606 ^visibility
607 )
608 )
609
610 query
611 else
612 Logger.error("Could not restrict visibility to #{visibility}")
613 end
614 end
615
616 defp restrict_visibility(query, %{visibility: visibility})
617 when visibility in @valid_visibilities do
618 from(
619 a in query,
620 where:
621 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
622 )
623 end
624
625 defp restrict_visibility(_query, %{visibility: visibility})
626 when visibility not in @valid_visibilities do
627 Logger.error("Could not restrict visibility to #{visibility}")
628 end
629
630 defp restrict_visibility(query, _visibility), do: query
631
632 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
633 when is_list(visibility) do
634 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
635 from(
636 a in query,
637 where:
638 not fragment(
639 "activity_visibility(?, ?, ?) = ANY (?)",
640 a.actor,
641 a.recipients,
642 a.data,
643 ^visibility
644 )
645 )
646 else
647 Logger.error("Could not exclude visibility to #{visibility}")
648 query
649 end
650 end
651
652 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
653 when visibility in @valid_visibilities do
654 from(
655 a in query,
656 where:
657 not fragment(
658 "activity_visibility(?, ?, ?) = ?",
659 a.actor,
660 a.recipients,
661 a.data,
662 ^visibility
663 )
664 )
665 end
666
667 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
668 when visibility not in @valid_visibilities do
669 Logger.error("Could not exclude visibility to #{visibility}")
670 query
671 end
672
673 defp exclude_visibility(query, _visibility), do: query
674
675 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
676 do: query
677
678 defp restrict_thread_visibility(
679 query,
680 %{"user" => %User{skip_thread_containment: true}},
681 _
682 ),
683 do: query
684
685 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
686 from(
687 a in query,
688 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
689 )
690 end
691
692 defp restrict_thread_visibility(query, _, _), do: query
693
694 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
695 params =
696 params
697 |> Map.put("user", reading_user)
698 |> Map.put("actor_id", user.ap_id)
699 |> Map.put("whole_db", true)
700
701 recipients =
702 user_activities_recipients(%{
703 "godmode" => params["godmode"],
704 "reading_user" => reading_user
705 })
706
707 fetch_activities(recipients, params)
708 |> Enum.reverse()
709 end
710
711 def fetch_user_activities(user, reading_user, params \\ %{}) do
712 params =
713 params
714 |> Map.put("type", ["Create", "Announce"])
715 |> Map.put("user", reading_user)
716 |> Map.put("actor_id", user.ap_id)
717 |> Map.put("whole_db", true)
718 |> Map.put("pinned_activity_ids", user.pinned_activities)
719
720 recipients =
721 user_activities_recipients(%{
722 "godmode" => params["godmode"],
723 "reading_user" => reading_user
724 })
725
726 fetch_activities(recipients, params)
727 |> Enum.reverse()
728 end
729
730 defp user_activities_recipients(%{"godmode" => true}) do
731 []
732 end
733
734 defp user_activities_recipients(%{"reading_user" => reading_user}) do
735 if reading_user do
736 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
737 else
738 [Pleroma.Constants.as_public()]
739 end
740 end
741
742 defp restrict_since(query, %{"since_id" => ""}), do: query
743
744 defp restrict_since(query, %{"since_id" => since_id}) do
745 from(activity in query, where: activity.id > ^since_id)
746 end
747
748 defp restrict_since(query, _), do: query
749
750 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
751 raise "Can't use the child object without preloading!"
752 end
753
754 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
755 when is_list(tag_reject) and tag_reject != [] do
756 from(
757 [_activity, object] in query,
758 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
759 )
760 end
761
762 defp restrict_tag_reject(query, _), do: query
763
764 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
765 raise "Can't use the child object without preloading!"
766 end
767
768 defp restrict_tag_all(query, %{"tag_all" => tag_all})
769 when is_list(tag_all) and tag_all != [] do
770 from(
771 [_activity, object] in query,
772 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
773 )
774 end
775
776 defp restrict_tag_all(query, _), do: query
777
778 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
779 raise "Can't use the child object without preloading!"
780 end
781
782 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
783 from(
784 [_activity, object] in query,
785 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
786 )
787 end
788
789 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
790 from(
791 [_activity, object] in query,
792 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
793 )
794 end
795
796 defp restrict_tag(query, _), do: query
797
798 defp restrict_recipients(query, [], _user), do: query
799
800 defp restrict_recipients(query, recipients, nil) do
801 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
802 end
803
804 defp restrict_recipients(query, recipients, user) do
805 from(
806 activity in query,
807 where: fragment("? && ?", ^recipients, activity.recipients),
808 or_where: activity.actor == ^user.ap_id
809 )
810 end
811
812 defp restrict_local(query, %{"local_only" => true}) do
813 from(activity in query, where: activity.local == true)
814 end
815
816 defp restrict_local(query, _), do: query
817
818 defp restrict_actor(query, %{"actor_id" => actor_id}) do
819 from(activity in query, where: activity.actor == ^actor_id)
820 end
821
822 defp restrict_actor(query, _), do: query
823
824 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
825 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
826 end
827
828 defp restrict_type(query, %{"type" => type}) do
829 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
830 end
831
832 defp restrict_type(query, _), do: query
833
834 defp restrict_state(query, %{"state" => state}) do
835 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
836 end
837
838 defp restrict_state(query, _), do: query
839
840 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
841 from(
842 [_activity, object] in query,
843 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
844 )
845 end
846
847 defp restrict_favorited_by(query, _), do: query
848
849 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
850 raise "Can't use the child object without preloading!"
851 end
852
853 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
854 from(
855 [_activity, object] in query,
856 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
857 )
858 end
859
860 defp restrict_media(query, _), do: query
861
862 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
863 from(
864 [_activity, object] in query,
865 where: fragment("?->>'inReplyTo' is null", object.data)
866 )
867 end
868
869 defp restrict_replies(query, _), do: query
870
871 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
872 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
873 end
874
875 defp restrict_reblogs(query, _), do: query
876
877 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
878
879 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
880 mutes = user.mutes
881
882 query =
883 from([activity] in query,
884 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
885 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
886 )
887
888 unless opts["skip_preload"] do
889 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
890 else
891 query
892 end
893 end
894
895 defp restrict_muted(query, _), do: query
896
897 defp restrict_blocked(query, %{"blocking_user" => %User{} = user}) do
898 blocks = user.blocks || []
899 domain_blocks = user.domain_blocks || []
900
901 query =
902 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
903
904 from(
905 [activity, object: o] in query,
906 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
907 where: fragment("not (? && ?)", activity.recipients, ^blocks),
908 where:
909 fragment(
910 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
911 activity.data,
912 activity.data,
913 ^blocks
914 ),
915 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
916 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
917 )
918 end
919
920 defp restrict_blocked(query, _), do: query
921
922 defp restrict_unlisted(query) do
923 from(
924 activity in query,
925 where:
926 fragment(
927 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
928 activity.data,
929 ^[Pleroma.Constants.as_public()]
930 )
931 )
932 end
933
934 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
935 from(activity in query, where: activity.id in ^ids)
936 end
937
938 defp restrict_pinned(query, _), do: query
939
940 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user}) do
941 muted_reblogs = user.muted_reblogs || []
942
943 from(
944 activity in query,
945 where:
946 fragment(
947 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
948 activity.data,
949 activity.actor,
950 ^muted_reblogs
951 )
952 )
953 end
954
955 defp restrict_muted_reblogs(query, _), do: query
956
957 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
958
959 defp exclude_poll_votes(query, _) do
960 if has_named_binding?(query, :object) do
961 from([activity, object: o] in query,
962 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
963 )
964 else
965 query
966 end
967 end
968
969 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
970 from(activity in query, where: activity.id != ^id)
971 end
972
973 defp exclude_id(query, _), do: query
974
975 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
976
977 defp maybe_preload_objects(query, _) do
978 query
979 |> Activity.with_preloaded_object()
980 end
981
982 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
983
984 defp maybe_preload_bookmarks(query, opts) do
985 query
986 |> Activity.with_preloaded_bookmark(opts["user"])
987 end
988
989 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
990
991 defp maybe_set_thread_muted_field(query, opts) do
992 query
993 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
994 end
995
996 defp maybe_order(query, %{order: :desc}) do
997 query
998 |> order_by(desc: :id)
999 end
1000
1001 defp maybe_order(query, %{order: :asc}) do
1002 query
1003 |> order_by(asc: :id)
1004 end
1005
1006 defp maybe_order(query, _), do: query
1007
1008 def fetch_activities_query(recipients, opts \\ %{}) do
1009 config = %{
1010 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1011 }
1012
1013 Activity
1014 |> maybe_preload_objects(opts)
1015 |> maybe_preload_bookmarks(opts)
1016 |> maybe_set_thread_muted_field(opts)
1017 |> maybe_order(opts)
1018 |> restrict_recipients(recipients, opts["user"])
1019 |> restrict_tag(opts)
1020 |> restrict_tag_reject(opts)
1021 |> restrict_tag_all(opts)
1022 |> restrict_since(opts)
1023 |> restrict_local(opts)
1024 |> restrict_actor(opts)
1025 |> restrict_type(opts)
1026 |> restrict_state(opts)
1027 |> restrict_favorited_by(opts)
1028 |> restrict_blocked(opts)
1029 |> restrict_muted(opts)
1030 |> restrict_media(opts)
1031 |> restrict_visibility(opts)
1032 |> restrict_thread_visibility(opts, config)
1033 |> restrict_replies(opts)
1034 |> restrict_reblogs(opts)
1035 |> restrict_pinned(opts)
1036 |> restrict_muted_reblogs(opts)
1037 |> Activity.restrict_deactivated_users()
1038 |> exclude_poll_votes(opts)
1039 |> exclude_visibility(opts)
1040 end
1041
1042 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1043 list_memberships = Pleroma.List.memberships(opts["user"])
1044
1045 fetch_activities_query(recipients ++ list_memberships, opts)
1046 |> Pagination.fetch_paginated(opts, pagination)
1047 |> Enum.reverse()
1048 |> maybe_update_cc(list_memberships, opts["user"])
1049 end
1050
1051 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1052 when is_list(list_memberships) and length(list_memberships) > 0 do
1053 Enum.map(activities, fn
1054 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1055 if Enum.any?(bcc, &(&1 in list_memberships)) do
1056 update_in(activity.data["cc"], &[user_ap_id | &1])
1057 else
1058 activity
1059 end
1060
1061 activity ->
1062 activity
1063 end)
1064 end
1065
1066 defp maybe_update_cc(activities, _, _), do: activities
1067
1068 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1069 from(activity in query,
1070 where:
1071 fragment("? && ?", activity.recipients, ^recipients) or
1072 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1073 ^Pleroma.Constants.as_public() in activity.recipients)
1074 )
1075 end
1076
1077 def fetch_activities_bounded(
1078 recipients,
1079 recipients_with_public,
1080 opts \\ %{},
1081 pagination \\ :keyset
1082 ) do
1083 fetch_activities_query([], opts)
1084 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1085 |> Pagination.fetch_paginated(opts, pagination)
1086 |> Enum.reverse()
1087 end
1088
1089 def upload(file, opts \\ []) do
1090 with {:ok, data} <- Upload.store(file, opts) do
1091 obj_data =
1092 if opts[:actor] do
1093 Map.put(data, "actor", opts[:actor])
1094 else
1095 data
1096 end
1097
1098 Repo.insert(%Object{data: obj_data})
1099 end
1100 end
1101
1102 defp object_to_user_data(data) do
1103 avatar =
1104 data["icon"]["url"] &&
1105 %{
1106 "type" => "Image",
1107 "url" => [%{"href" => data["icon"]["url"]}]
1108 }
1109
1110 banner =
1111 data["image"]["url"] &&
1112 %{
1113 "type" => "Image",
1114 "url" => [%{"href" => data["image"]["url"]}]
1115 }
1116
1117 fields =
1118 data
1119 |> Map.get("attachment", [])
1120 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1121 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1122
1123 locked = data["manuallyApprovesFollowers"] || false
1124 data = Transmogrifier.maybe_fix_user_object(data)
1125 discoverable = data["discoverable"] || false
1126 invisible = data["invisible"] || false
1127
1128 user_data = %{
1129 ap_id: data["id"],
1130 ap_enabled: true,
1131 source_data: data,
1132 banner: banner,
1133 fields: fields,
1134 locked: locked,
1135 discoverable: discoverable,
1136 invisible: invisible,
1137 avatar: avatar,
1138 name: data["name"],
1139 follower_address: data["followers"],
1140 following_address: data["following"],
1141 bio: data["summary"]
1142 }
1143
1144 # nickname can be nil because of virtual actors
1145 user_data =
1146 if data["preferredUsername"] do
1147 Map.put(
1148 user_data,
1149 :nickname,
1150 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1151 )
1152 else
1153 Map.put(user_data, :nickname, nil)
1154 end
1155
1156 {:ok, user_data}
1157 end
1158
1159 def fetch_follow_information_for_user(user) do
1160 with {:ok, following_data} <-
1161 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1162 following_count when is_integer(following_count) <- following_data["totalItems"],
1163 {:ok, hide_follows} <- collection_private(following_data),
1164 {:ok, followers_data} <-
1165 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1166 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1167 {:ok, hide_followers} <- collection_private(followers_data) do
1168 {:ok,
1169 %{
1170 hide_follows: hide_follows,
1171 follower_count: followers_count,
1172 following_count: following_count,
1173 hide_followers: hide_followers
1174 }}
1175 else
1176 {:error, _} = e ->
1177 e
1178
1179 e ->
1180 {:error, e}
1181 end
1182 end
1183
1184 defp maybe_update_follow_information(data) do
1185 with {:enabled, true} <-
1186 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1187 {:ok, info} <- fetch_follow_information_for_user(data) do
1188 info = Map.merge(data[:info] || %{}, info)
1189 Map.put(data, :info, info)
1190 else
1191 {:enabled, false} ->
1192 data
1193
1194 e ->
1195 Logger.error(
1196 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1197 )
1198
1199 data
1200 end
1201 end
1202
1203 defp collection_private(data) do
1204 if is_map(data["first"]) and
1205 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1206 {:ok, false}
1207 else
1208 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1209 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1210 {:ok, false}
1211 else
1212 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1213 {:ok, true}
1214
1215 {:error, _} = e ->
1216 e
1217
1218 e ->
1219 {:error, e}
1220 end
1221 end
1222 end
1223
1224 def user_data_from_user_object(data) do
1225 with {:ok, data} <- MRF.filter(data),
1226 {:ok, data} <- object_to_user_data(data) do
1227 {:ok, data}
1228 else
1229 e -> {:error, e}
1230 end
1231 end
1232
1233 def fetch_and_prepare_user_from_ap_id(ap_id) do
1234 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1235 {:ok, data} <- user_data_from_user_object(data),
1236 data <- maybe_update_follow_information(data) do
1237 {:ok, data}
1238 else
1239 e ->
1240 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1241 {:error, e}
1242 end
1243 end
1244
1245 def make_user_from_ap_id(ap_id) do
1246 if _user = User.get_cached_by_ap_id(ap_id) do
1247 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1248 else
1249 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1250 User.insert_or_update_user(data)
1251 else
1252 e -> {:error, e}
1253 end
1254 end
1255 end
1256
1257 def make_user_from_nickname(nickname) do
1258 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1259 make_user_from_ap_id(ap_id)
1260 else
1261 _e -> {:error, "No AP id in WebFinger"}
1262 end
1263 end
1264
1265 # filter out broken threads
1266 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1267 entire_thread_visible_for_user?(activity, user)
1268 end
1269
1270 # do post-processing on a specific activity
1271 def contain_activity(%Activity{} = activity, %User{} = user) do
1272 contain_broken_threads(activity, user)
1273 end
1274
1275 def fetch_direct_messages_query do
1276 Activity
1277 |> restrict_type(%{"type" => "Create"})
1278 |> restrict_visibility(%{visibility: "direct"})
1279 |> order_by([activity], asc: activity.id)
1280 end
1281 end