rework to use properties instead of compound typing, per SocialCG
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.info.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:quick_insert, true, activity} ->
258 {:ok, activity}
259
260 {:fake, true, activity} ->
261 {:ok, activity}
262
263 {:error, message} ->
264 {:error, message}
265 end
266 end
267
268 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273
274 with listen_data <-
275 make_listen_data(
276 %{to: to, actor: actor, published: published, context: context, object: object},
277 additional
278 ),
279 {:ok, activity} <- insert(listen_data, local),
280 :ok <- maybe_federate(activity) do
281 {:ok, activity}
282 else
283 {:error, message} ->
284 {:error, message}
285 end
286 end
287
288 def accept(params) do
289 accept_or_reject("Accept", params)
290 end
291
292 def reject(params) do
293 accept_or_reject("Reject", params)
294 end
295
296 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
297 local = Map.get(params, :local, true)
298 activity_id = Map.get(params, :activity_id, nil)
299
300 with data <-
301 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
302 |> Utils.maybe_put("id", activity_id),
303 {:ok, activity} <- insert(data, local),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 end
307 end
308
309 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
310 local = !(params[:local] == false)
311 activity_id = params[:activity_id]
312
313 with data <- %{
314 "to" => to,
315 "cc" => cc,
316 "type" => "Update",
317 "actor" => actor,
318 "object" => object
319 },
320 data <- Utils.maybe_put(data, "id", activity_id),
321 {:ok, activity} <- insert(data, local),
322 :ok <- maybe_federate(activity) do
323 {:ok, activity}
324 end
325 end
326
327 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
328 def like(
329 %User{ap_id: ap_id} = user,
330 %Object{data: %{"id" => _}} = object,
331 activity_id \\ nil,
332 local \\ true
333 ) do
334 with nil <- get_existing_like(ap_id, object),
335 like_data <- make_like_data(user, object, activity_id),
336 {:ok, activity} <- insert(like_data, local),
337 {:ok, object} <- add_like_to_object(activity, object),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity, object}
340 else
341 %Activity{} = activity -> {:ok, activity, object}
342 error -> {:error, error}
343 end
344 end
345
346 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
347 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
348 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
349 {:ok, unlike_activity} <- insert(unlike_data, local),
350 {:ok, _activity} <- Repo.delete(like_activity),
351 {:ok, object} <- remove_like_from_object(like_activity, object),
352 :ok <- maybe_federate(unlike_activity) do
353 {:ok, unlike_activity, like_activity, object}
354 else
355 _e -> {:ok, object}
356 end
357 end
358
359 def announce(
360 %User{ap_id: _} = user,
361 %Object{data: %{"id" => _}} = object,
362 activity_id \\ nil,
363 local \\ true,
364 public \\ true
365 ) do
366 with true <- is_announceable?(object, user, public),
367 announce_data <- make_announce_data(user, object, activity_id, public),
368 {:ok, activity} <- insert(announce_data, local),
369 {:ok, object} <- add_announce_to_object(activity, object),
370 :ok <- maybe_federate(activity) do
371 {:ok, activity, object}
372 else
373 error -> {:error, error}
374 end
375 end
376
377 def unannounce(
378 %User{} = actor,
379 %Object{} = object,
380 activity_id \\ nil,
381 local \\ true
382 ) do
383 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
384 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
385 {:ok, unannounce_activity} <- insert(unannounce_data, local),
386 :ok <- maybe_federate(unannounce_activity),
387 {:ok, _activity} <- Repo.delete(announce_activity),
388 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
389 {:ok, unannounce_activity, object}
390 else
391 _e -> {:ok, object}
392 end
393 end
394
395 def follow(follower, followed, activity_id \\ nil, local \\ true) do
396 with data <- make_follow_data(follower, followed, activity_id),
397 {:ok, activity} <- insert(data, local),
398 :ok <- maybe_federate(activity),
399 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
400 {:ok, activity}
401 end
402 end
403
404 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
405 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
406 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
407 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
408 {:ok, activity} <- insert(unfollow_data, local),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
415 with data <- %{
416 "to" => [follower_address],
417 "type" => "Delete",
418 "actor" => ap_id,
419 "object" => %{"type" => "Person", "id" => ap_id}
420 },
421 {:ok, activity} <- insert(data, true, true, true),
422 :ok <- maybe_federate(activity) do
423 {:ok, user}
424 end
425 end
426
427 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
428 local = Keyword.get(options, :local, true)
429 activity_id = Keyword.get(options, :activity_id, nil)
430 actor = Keyword.get(options, :actor, actor)
431
432 user = User.get_cached_by_ap_id(actor)
433 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
434
435 with {:ok, object, activity} <- Object.delete(object),
436 data <-
437 %{
438 "type" => "Delete",
439 "actor" => actor,
440 "object" => id,
441 "to" => to,
442 "deleted_activity_id" => activity && activity.id
443 }
444 |> maybe_put("id", activity_id),
445 {:ok, activity} <- insert(data, local, false),
446 stream_out_participations(object, user),
447 _ <- decrease_replies_count_if_reply(object),
448 # Changing note count prior to enqueuing federation task in order to avoid
449 # race conditions on updating user.info
450 {:ok, _actor} <- decrease_note_count_if_public(user, object),
451 :ok <- maybe_federate(activity) do
452 {:ok, activity}
453 end
454 end
455
456 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
457 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
458 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
459 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
460
461 if unfollow_blocked do
462 follow_activity = fetch_latest_follow(blocker, blocked)
463 if follow_activity, do: unfollow(blocker, blocked, nil, local)
464 end
465
466 with true <- outgoing_blocks,
467 block_data <- make_block_data(blocker, blocked, activity_id),
468 {:ok, activity} <- insert(block_data, local),
469 :ok <- maybe_federate(activity) do
470 {:ok, activity}
471 else
472 _e -> {:ok, nil}
473 end
474 end
475
476 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
477 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
478 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
479 {:ok, activity} <- insert(unblock_data, local),
480 :ok <- maybe_federate(activity) do
481 {:ok, activity}
482 end
483 end
484
485 @spec flag(map()) :: {:ok, Activity.t()} | any
486 def flag(
487 %{
488 actor: actor,
489 context: _context,
490 account: account,
491 statuses: statuses,
492 content: content
493 } = params
494 ) do
495 # only accept false as false value
496 local = !(params[:local] == false)
497 forward = !(params[:forward] == false)
498
499 additional = params[:additional] || %{}
500
501 additional =
502 if forward do
503 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
504 else
505 Map.merge(additional, %{"to" => [], "cc" => []})
506 end
507
508 with flag_data <- make_flag_data(params, additional),
509 {:ok, activity} <- insert(flag_data, local),
510 :ok <- maybe_federate(activity) do
511 Enum.each(User.all_superusers(), fn superuser ->
512 superuser
513 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
514 |> Pleroma.Emails.Mailer.deliver_async()
515 end)
516
517 {:ok, activity}
518 end
519 end
520
521 defp fetch_activities_for_context_query(context, opts) do
522 public = [Pleroma.Constants.as_public()]
523
524 recipients =
525 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
526
527 from(activity in Activity)
528 |> maybe_preload_objects(opts)
529 |> maybe_preload_bookmarks(opts)
530 |> maybe_set_thread_muted_field(opts)
531 |> restrict_blocked(opts)
532 |> restrict_recipients(recipients, opts["user"])
533 |> where(
534 [activity],
535 fragment(
536 "?->>'type' = ? and ?->>'context' = ?",
537 activity.data,
538 "Create",
539 activity.data,
540 ^context
541 )
542 )
543 |> exclude_poll_votes(opts)
544 |> exclude_id(opts)
545 |> order_by([activity], desc: activity.id)
546 end
547
548 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
549 def fetch_activities_for_context(context, opts \\ %{}) do
550 context
551 |> fetch_activities_for_context_query(opts)
552 |> Repo.all()
553 end
554
555 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
556 FlakeId.Ecto.CompatType.t() | nil
557 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
558 context
559 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
560 |> limit(1)
561 |> select([a], a.id)
562 |> Repo.one()
563 end
564
565 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
566 opts = Map.drop(opts, ["user"])
567
568 [Pleroma.Constants.as_public()]
569 |> fetch_activities_query(opts)
570 |> restrict_unlisted()
571 |> Pagination.fetch_paginated(opts, pagination)
572 |> Enum.reverse()
573 end
574
575 @valid_visibilities ~w[direct unlisted public private]
576
577 defp restrict_visibility(query, %{visibility: visibility})
578 when is_list(visibility) do
579 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
580 query =
581 from(
582 a in query,
583 where:
584 fragment(
585 "activity_visibility(?, ?, ?) = ANY (?)",
586 a.actor,
587 a.recipients,
588 a.data,
589 ^visibility
590 )
591 )
592
593 query
594 else
595 Logger.error("Could not restrict visibility to #{visibility}")
596 end
597 end
598
599 defp restrict_visibility(query, %{visibility: visibility})
600 when visibility in @valid_visibilities do
601 from(
602 a in query,
603 where:
604 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
605 )
606 end
607
608 defp restrict_visibility(_query, %{visibility: visibility})
609 when visibility not in @valid_visibilities do
610 Logger.error("Could not restrict visibility to #{visibility}")
611 end
612
613 defp restrict_visibility(query, _visibility), do: query
614
615 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
616 when is_list(visibility) do
617 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
618 from(
619 a in query,
620 where:
621 not fragment(
622 "activity_visibility(?, ?, ?) = ANY (?)",
623 a.actor,
624 a.recipients,
625 a.data,
626 ^visibility
627 )
628 )
629 else
630 Logger.error("Could not exclude visibility to #{visibility}")
631 query
632 end
633 end
634
635 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
636 when visibility in @valid_visibilities do
637 from(
638 a in query,
639 where:
640 not fragment(
641 "activity_visibility(?, ?, ?) = ?",
642 a.actor,
643 a.recipients,
644 a.data,
645 ^visibility
646 )
647 )
648 end
649
650 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
651 when visibility not in @valid_visibilities do
652 Logger.error("Could not exclude visibility to #{visibility}")
653 query
654 end
655
656 defp exclude_visibility(query, _visibility), do: query
657
658 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
659 do: query
660
661 defp restrict_thread_visibility(
662 query,
663 %{"user" => %User{info: %{skip_thread_containment: true}}},
664 _
665 ),
666 do: query
667
668 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
669 from(
670 a in query,
671 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
672 )
673 end
674
675 defp restrict_thread_visibility(query, _, _), do: query
676
677 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
678 params =
679 params
680 |> Map.put("user", reading_user)
681 |> Map.put("actor_id", user.ap_id)
682 |> Map.put("whole_db", true)
683
684 recipients =
685 user_activities_recipients(%{
686 "godmode" => params["godmode"],
687 "reading_user" => reading_user
688 })
689
690 fetch_activities(recipients, params)
691 |> Enum.reverse()
692 end
693
694 def fetch_user_activities(user, reading_user, params \\ %{}) do
695 params =
696 params
697 |> Map.put("type", ["Create", "Announce"])
698 |> Map.put("user", reading_user)
699 |> Map.put("actor_id", user.ap_id)
700 |> Map.put("whole_db", true)
701 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
702
703 recipients =
704 user_activities_recipients(%{
705 "godmode" => params["godmode"],
706 "reading_user" => reading_user
707 })
708
709 fetch_activities(recipients, params)
710 |> Enum.reverse()
711 end
712
713 defp user_activities_recipients(%{"godmode" => true}) do
714 []
715 end
716
717 defp user_activities_recipients(%{"reading_user" => reading_user}) do
718 if reading_user do
719 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
720 else
721 [Pleroma.Constants.as_public()]
722 end
723 end
724
725 defp restrict_since(query, %{"since_id" => ""}), do: query
726
727 defp restrict_since(query, %{"since_id" => since_id}) do
728 from(activity in query, where: activity.id > ^since_id)
729 end
730
731 defp restrict_since(query, _), do: query
732
733 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
734 raise "Can't use the child object without preloading!"
735 end
736
737 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
738 when is_list(tag_reject) and tag_reject != [] do
739 from(
740 [_activity, object] in query,
741 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
742 )
743 end
744
745 defp restrict_tag_reject(query, _), do: query
746
747 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
748 raise "Can't use the child object without preloading!"
749 end
750
751 defp restrict_tag_all(query, %{"tag_all" => tag_all})
752 when is_list(tag_all) and tag_all != [] do
753 from(
754 [_activity, object] in query,
755 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
756 )
757 end
758
759 defp restrict_tag_all(query, _), do: query
760
761 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
762 raise "Can't use the child object without preloading!"
763 end
764
765 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
766 from(
767 [_activity, object] in query,
768 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
769 )
770 end
771
772 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
773 from(
774 [_activity, object] in query,
775 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
776 )
777 end
778
779 defp restrict_tag(query, _), do: query
780
781 defp restrict_recipients(query, [], _user), do: query
782
783 defp restrict_recipients(query, recipients, nil) do
784 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
785 end
786
787 defp restrict_recipients(query, recipients, user) do
788 from(
789 activity in query,
790 where: fragment("? && ?", ^recipients, activity.recipients),
791 or_where: activity.actor == ^user.ap_id
792 )
793 end
794
795 defp restrict_local(query, %{"local_only" => true}) do
796 from(activity in query, where: activity.local == true)
797 end
798
799 defp restrict_local(query, _), do: query
800
801 defp restrict_actor(query, %{"actor_id" => actor_id}) do
802 from(activity in query, where: activity.actor == ^actor_id)
803 end
804
805 defp restrict_actor(query, _), do: query
806
807 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
808 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
809 end
810
811 defp restrict_type(query, %{"type" => type}) do
812 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
813 end
814
815 defp restrict_type(query, _), do: query
816
817 defp restrict_state(query, %{"state" => state}) do
818 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
819 end
820
821 defp restrict_state(query, _), do: query
822
823 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
824 from(
825 [_activity, object] in query,
826 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
827 )
828 end
829
830 defp restrict_favorited_by(query, _), do: query
831
832 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
833 raise "Can't use the child object without preloading!"
834 end
835
836 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
837 from(
838 [_activity, object] in query,
839 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
840 )
841 end
842
843 defp restrict_media(query, _), do: query
844
845 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
846 from(
847 [_activity, object] in query,
848 where: fragment("?->>'inReplyTo' is null", object.data)
849 )
850 end
851
852 defp restrict_replies(query, _), do: query
853
854 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
855 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
856 end
857
858 defp restrict_reblogs(query, _), do: query
859
860 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
861
862 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
863 mutes = info.mutes
864
865 query =
866 from([activity] in query,
867 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
868 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
869 )
870
871 unless opts["skip_preload"] do
872 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
873 else
874 query
875 end
876 end
877
878 defp restrict_muted(query, _), do: query
879
880 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
881 blocks = info.blocks || []
882 domain_blocks = info.domain_blocks || []
883
884 query =
885 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
886
887 from(
888 [activity, object: o] in query,
889 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
890 where: fragment("not (? && ?)", activity.recipients, ^blocks),
891 where:
892 fragment(
893 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
894 activity.data,
895 activity.data,
896 ^blocks
897 ),
898 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
899 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
900 )
901 end
902
903 defp restrict_blocked(query, _), do: query
904
905 defp restrict_unlisted(query) do
906 from(
907 activity in query,
908 where:
909 fragment(
910 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
911 activity.data,
912 ^[Pleroma.Constants.as_public()]
913 )
914 )
915 end
916
917 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
918 from(activity in query, where: activity.id in ^ids)
919 end
920
921 defp restrict_pinned(query, _), do: query
922
923 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
924 muted_reblogs = info.muted_reblogs || []
925
926 from(
927 activity in query,
928 where:
929 fragment(
930 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
931 activity.data,
932 activity.actor,
933 ^muted_reblogs
934 )
935 )
936 end
937
938 defp restrict_muted_reblogs(query, _), do: query
939
940 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
941
942 defp exclude_poll_votes(query, _) do
943 if has_named_binding?(query, :object) do
944 from([activity, object: o] in query,
945 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
946 )
947 else
948 query
949 end
950 end
951
952 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
953 from(activity in query, where: activity.id != ^id)
954 end
955
956 defp exclude_id(query, _), do: query
957
958 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
959
960 defp maybe_preload_objects(query, _) do
961 query
962 |> Activity.with_preloaded_object()
963 end
964
965 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
966
967 defp maybe_preload_bookmarks(query, opts) do
968 query
969 |> Activity.with_preloaded_bookmark(opts["user"])
970 end
971
972 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
973
974 defp maybe_set_thread_muted_field(query, opts) do
975 query
976 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
977 end
978
979 defp maybe_order(query, %{order: :desc}) do
980 query
981 |> order_by(desc: :id)
982 end
983
984 defp maybe_order(query, %{order: :asc}) do
985 query
986 |> order_by(asc: :id)
987 end
988
989 defp maybe_order(query, _), do: query
990
991 def fetch_activities_query(recipients, opts \\ %{}) do
992 config = %{
993 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
994 }
995
996 Activity
997 |> maybe_preload_objects(opts)
998 |> maybe_preload_bookmarks(opts)
999 |> maybe_set_thread_muted_field(opts)
1000 |> maybe_order(opts)
1001 |> restrict_recipients(recipients, opts["user"])
1002 |> restrict_tag(opts)
1003 |> restrict_tag_reject(opts)
1004 |> restrict_tag_all(opts)
1005 |> restrict_since(opts)
1006 |> restrict_local(opts)
1007 |> restrict_actor(opts)
1008 |> restrict_type(opts)
1009 |> restrict_state(opts)
1010 |> restrict_favorited_by(opts)
1011 |> restrict_blocked(opts)
1012 |> restrict_muted(opts)
1013 |> restrict_media(opts)
1014 |> restrict_visibility(opts)
1015 |> restrict_thread_visibility(opts, config)
1016 |> restrict_replies(opts)
1017 |> restrict_reblogs(opts)
1018 |> restrict_pinned(opts)
1019 |> restrict_muted_reblogs(opts)
1020 |> Activity.restrict_deactivated_users()
1021 |> exclude_poll_votes(opts)
1022 |> exclude_visibility(opts)
1023 end
1024
1025 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1026 list_memberships = Pleroma.List.memberships(opts["user"])
1027
1028 fetch_activities_query(recipients ++ list_memberships, opts)
1029 |> Pagination.fetch_paginated(opts, pagination)
1030 |> Enum.reverse()
1031 |> maybe_update_cc(list_memberships, opts["user"])
1032 end
1033
1034 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1035 when is_list(list_memberships) and length(list_memberships) > 0 do
1036 Enum.map(activities, fn
1037 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1038 if Enum.any?(bcc, &(&1 in list_memberships)) do
1039 update_in(activity.data["cc"], &[user_ap_id | &1])
1040 else
1041 activity
1042 end
1043
1044 activity ->
1045 activity
1046 end)
1047 end
1048
1049 defp maybe_update_cc(activities, _, _), do: activities
1050
1051 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1052 from(activity in query,
1053 where:
1054 fragment("? && ?", activity.recipients, ^recipients) or
1055 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1056 ^Pleroma.Constants.as_public() in activity.recipients)
1057 )
1058 end
1059
1060 def fetch_activities_bounded(
1061 recipients,
1062 recipients_with_public,
1063 opts \\ %{},
1064 pagination \\ :keyset
1065 ) do
1066 fetch_activities_query([], opts)
1067 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1068 |> Pagination.fetch_paginated(opts, pagination)
1069 |> Enum.reverse()
1070 end
1071
1072 def upload(file, opts \\ []) do
1073 with {:ok, data} <- Upload.store(file, opts) do
1074 obj_data =
1075 if opts[:actor] do
1076 Map.put(data, "actor", opts[:actor])
1077 else
1078 data
1079 end
1080
1081 Repo.insert(%Object{data: obj_data})
1082 end
1083 end
1084
1085 defp object_to_user_data(data) do
1086 avatar =
1087 data["icon"]["url"] &&
1088 %{
1089 "type" => "Image",
1090 "url" => [%{"href" => data["icon"]["url"]}]
1091 }
1092
1093 banner =
1094 data["image"]["url"] &&
1095 %{
1096 "type" => "Image",
1097 "url" => [%{"href" => data["image"]["url"]}]
1098 }
1099
1100 fields =
1101 data
1102 |> Map.get("attachment", [])
1103 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1104 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1105
1106 locked = data["manuallyApprovesFollowers"] || false
1107 data = Transmogrifier.maybe_fix_user_object(data)
1108 discoverable = data["discoverable"] || false
1109 invisible = data["invisible"] || false
1110
1111 user_data = %{
1112 ap_id: data["id"],
1113 info: %{
1114 ap_enabled: true,
1115 source_data: data,
1116 banner: banner,
1117 fields: fields,
1118 locked: locked,
1119 discoverable: discoverable,
1120 invisible: invisible
1121 },
1122 avatar: avatar,
1123 name: data["name"],
1124 follower_address: data["followers"],
1125 following_address: data["following"],
1126 bio: data["summary"]
1127 }
1128
1129 # nickname can be nil because of virtual actors
1130 user_data =
1131 if data["preferredUsername"] do
1132 Map.put(
1133 user_data,
1134 :nickname,
1135 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1136 )
1137 else
1138 Map.put(user_data, :nickname, nil)
1139 end
1140
1141 {:ok, user_data}
1142 end
1143
1144 def fetch_follow_information_for_user(user) do
1145 with {:ok, following_data} <-
1146 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1147 following_count when is_integer(following_count) <- following_data["totalItems"],
1148 {:ok, hide_follows} <- collection_private(following_data),
1149 {:ok, followers_data} <-
1150 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1151 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1152 {:ok, hide_followers} <- collection_private(followers_data) do
1153 {:ok,
1154 %{
1155 hide_follows: hide_follows,
1156 follower_count: followers_count,
1157 following_count: following_count,
1158 hide_followers: hide_followers
1159 }}
1160 else
1161 {:error, _} = e ->
1162 e
1163
1164 e ->
1165 {:error, e}
1166 end
1167 end
1168
1169 defp maybe_update_follow_information(data) do
1170 with {:enabled, true} <-
1171 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1172 {:ok, info} <- fetch_follow_information_for_user(data) do
1173 info = Map.merge(data.info, info)
1174 Map.put(data, :info, info)
1175 else
1176 {:enabled, false} ->
1177 data
1178
1179 e ->
1180 Logger.error(
1181 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1182 )
1183
1184 data
1185 end
1186 end
1187
1188 defp collection_private(data) do
1189 if is_map(data["first"]) and
1190 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1191 {:ok, false}
1192 else
1193 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1194 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1195 {:ok, false}
1196 else
1197 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1198 {:ok, true}
1199
1200 {:error, _} = e ->
1201 e
1202
1203 e ->
1204 {:error, e}
1205 end
1206 end
1207 end
1208
1209 def user_data_from_user_object(data) do
1210 with {:ok, data} <- MRF.filter(data),
1211 {:ok, data} <- object_to_user_data(data) do
1212 {:ok, data}
1213 else
1214 e -> {:error, e}
1215 end
1216 end
1217
1218 def fetch_and_prepare_user_from_ap_id(ap_id) do
1219 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1220 {:ok, data} <- user_data_from_user_object(data),
1221 data <- maybe_update_follow_information(data) do
1222 {:ok, data}
1223 else
1224 e ->
1225 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1226 {:error, e}
1227 end
1228 end
1229
1230 def make_user_from_ap_id(ap_id) do
1231 if _user = User.get_cached_by_ap_id(ap_id) do
1232 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1233 else
1234 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1235 User.insert_or_update_user(data)
1236 else
1237 e -> {:error, e}
1238 end
1239 end
1240 end
1241
1242 def make_user_from_nickname(nickname) do
1243 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1244 make_user_from_ap_id(ap_id)
1245 else
1246 _e -> {:error, "No AP id in WebFinger"}
1247 end
1248 end
1249
1250 # filter out broken threads
1251 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1252 entire_thread_visible_for_user?(activity, user)
1253 end
1254
1255 # do post-processing on a specific activity
1256 def contain_activity(%Activity{} = activity, %User{} = user) do
1257 contain_broken_threads(activity, user)
1258 end
1259
1260 def fetch_direct_messages_query do
1261 Activity
1262 |> restrict_type(%{"type" => "Create"})
1263 |> restrict_visibility(%{visibility: "direct"})
1264 |> order_by([activity], asc: activity.id)
1265 end
1266 end