Merge branch 'benchmark-finishing' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.Object.Containment
13 alias Pleroma.Object.Fetcher
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.Upload
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.MRF
19 alias Pleroma.Web.ActivityPub.Transmogrifier
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Streamer
22 alias Pleroma.Web.WebFinger
23 alias Pleroma.Workers.BackgroundWorker
24
25 import Ecto.Query
26 import Pleroma.Web.ActivityPub.Utils
27 import Pleroma.Web.ActivityPub.Visibility
28
29 require Logger
30 require Pleroma.Constants
31
32 # For Announce activities, we filter the recipients based on following status for any actors
33 # that match actual users. See issue #164 for more information about why this is necessary.
34 defp get_recipients(%{"type" => "Announce"} = data) do
35 to = Map.get(data, "to", [])
36 cc = Map.get(data, "cc", [])
37 bcc = Map.get(data, "bcc", [])
38 actor = User.get_cached_by_ap_id(data["actor"])
39
40 recipients =
41 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
42 case User.get_cached_by_ap_id(recipient) do
43 nil -> true
44 user -> User.following?(user, actor)
45 end
46 end)
47
48 {recipients, to, cc}
49 end
50
51 defp get_recipients(%{"type" => "Create"} = data) do
52 to = Map.get(data, "to", [])
53 cc = Map.get(data, "cc", [])
54 bcc = Map.get(data, "bcc", [])
55 actor = Map.get(data, "actor", [])
56 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
57 {recipients, to, cc}
58 end
59
60 defp get_recipients(data) do
61 to = Map.get(data, "to", [])
62 cc = Map.get(data, "cc", [])
63 bcc = Map.get(data, "bcc", [])
64 recipients = Enum.concat([to, cc, bcc])
65 {recipients, to, cc}
66 end
67
68 defp check_actor_is_active(actor) do
69 if not is_nil(actor) do
70 with user <- User.get_cached_by_ap_id(actor),
71 false <- user.info.deactivated do
72 true
73 else
74 _e -> false
75 end
76 else
77 true
78 end
79 end
80
81 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
82 limit = Config.get([:instance, :remote_limit])
83 String.length(content) <= limit
84 end
85
86 defp check_remote_limit(_), do: true
87
88 def increase_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
90 end
91
92 def decrease_note_count_if_public(actor, object) do
93 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
94 end
95
96 def increase_replies_count_if_reply(%{
97 "object" => %{"inReplyTo" => reply_ap_id} = object,
98 "type" => "Create"
99 }) do
100 if is_public?(object) do
101 Object.increase_replies_count(reply_ap_id)
102 end
103 end
104
105 def increase_replies_count_if_reply(_create_data), do: :noop
106
107 def decrease_replies_count_if_reply(%Object{
108 data: %{"inReplyTo" => reply_ap_id} = object
109 }) do
110 if is_public?(object) do
111 Object.decrease_replies_count(reply_ap_id)
112 end
113 end
114
115 def decrease_replies_count_if_reply(_object), do: :noop
116
117 def increase_poll_votes_if_vote(%{
118 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
119 "type" => "Create"
120 }) do
121 Object.increase_vote_count(reply_ap_id, name)
122 end
123
124 def increase_poll_votes_if_vote(_create_data), do: :noop
125
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
130 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 :ok <- Containment.contain_child(map),
135 {:ok, map, object} <- insert_full_object(map) do
136 {:ok, activity} =
137 Repo.insert(%Activity{
138 data: map,
139 local: local,
140 actor: map["actor"],
141 recipients: recipients
142 })
143
144 # Splice in the child object if we have one.
145 activity =
146 if not is_nil(object) do
147 Map.put(activity, :object, object)
148 else
149 activity
150 end
151
152 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
153
154 Notification.create_notifications(activity)
155
156 participations =
157 activity
158 |> Conversation.create_or_bump_for()
159 |> get_participations()
160
161 stream_out(activity)
162 stream_out_participations(participations)
163 {:ok, activity}
164 else
165 %Activity{} = activity ->
166 {:ok, activity}
167
168 {:fake, true, map, recipients} ->
169 activity = %Activity{
170 data: map,
171 local: local,
172 actor: map["actor"],
173 recipients: recipients,
174 id: "pleroma:fakeid"
175 }
176
177 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
178 {:ok, activity}
179
180 error ->
181 {:error, error}
182 end
183 end
184
185 defp get_participations({:ok, %{participations: participations}}), do: participations
186 defp get_participations(_), do: []
187
188 def stream_out_participations(participations) do
189 participations =
190 participations
191 |> Repo.preload(:user)
192
193 Streamer.stream("participation", participations)
194 end
195
196 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
197 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
198 conversation = Repo.preload(conversation, :participations),
199 last_activity_id =
200 fetch_latest_activity_id_for_context(conversation.ap_id, %{
201 "user" => user,
202 "blocking_user" => user
203 }) do
204 if last_activity_id do
205 stream_out_participations(conversation.participations)
206 end
207 end
208 end
209
210 def stream_out_participations(_, _), do: :noop
211
212 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
213 when data_type in ["Create", "Announce", "Delete"] do
214 activity
215 |> Topics.get_activity_topics()
216 |> Streamer.stream(activity)
217 end
218
219 def stream_out(_activity) do
220 :noop
221 end
222
223 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
224 additional = params[:additional] || %{}
225 # only accept false as false value
226 local = !(params[:local] == false)
227 published = params[:published]
228 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
229
230 with create_data <-
231 make_create_data(
232 %{to: to, actor: actor, published: published, context: context, object: object},
233 additional
234 ),
235 {:ok, activity} <- insert(create_data, local, fake),
236 {:fake, false, activity} <- {:fake, fake, activity},
237 _ <- increase_replies_count_if_reply(create_data),
238 _ <- increase_poll_votes_if_vote(create_data),
239 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
240 # Changing note count prior to enqueuing federation task in order to avoid
241 # race conditions on updating user.info
242 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
243 :ok <- maybe_federate(activity) do
244 {:ok, activity}
245 else
246 {:quick_insert, true, activity} ->
247 {:ok, activity}
248
249 {:fake, true, activity} ->
250 {:ok, activity}
251
252 {:error, message} ->
253 {:error, message}
254 end
255 end
256
257 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
258 additional = params[:additional] || %{}
259 # only accept false as false value
260 local = !(params[:local] == false)
261 published = params[:published]
262
263 with listen_data <-
264 make_listen_data(
265 %{to: to, actor: actor, published: published, context: context, object: object},
266 additional
267 ),
268 {:ok, activity} <- insert(listen_data, local),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity}
271 else
272 {:error, message} ->
273 {:error, message}
274 end
275 end
276
277 def accept(params) do
278 accept_or_reject("Accept", params)
279 end
280
281 def reject(params) do
282 accept_or_reject("Reject", params)
283 end
284
285 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
286 local = Map.get(params, :local, true)
287 activity_id = Map.get(params, :activity_id, nil)
288
289 with data <-
290 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
291 |> Utils.maybe_put("id", activity_id),
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 local = !(params[:local] == false)
300 activity_id = params[:activity_id]
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 data <- Utils.maybe_put(data, "id", activity_id),
310 {:ok, activity} <- insert(data, local),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
317 def like(
318 %User{ap_id: ap_id} = user,
319 %Object{data: %{"id" => _}} = object,
320 activity_id \\ nil,
321 local \\ true
322 ) do
323 with nil <- get_existing_like(ap_id, object),
324 like_data <- make_like_data(user, object, activity_id),
325 {:ok, activity} <- insert(like_data, local),
326 {:ok, object} <- add_like_to_object(activity, object),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity, object}
329 else
330 %Activity{} = activity -> {:ok, activity, object}
331 error -> {:error, error}
332 end
333 end
334
335 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
336 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
337 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
338 {:ok, unlike_activity} <- insert(unlike_data, local),
339 {:ok, _activity} <- Repo.delete(like_activity),
340 {:ok, object} <- remove_like_from_object(like_activity, object),
341 :ok <- maybe_federate(unlike_activity) do
342 {:ok, unlike_activity, like_activity, object}
343 else
344 _e -> {:ok, object}
345 end
346 end
347
348 def announce(
349 %User{ap_id: _} = user,
350 %Object{data: %{"id" => _}} = object,
351 activity_id \\ nil,
352 local \\ true,
353 public \\ true
354 ) do
355 with true <- is_announceable?(object, user, public),
356 announce_data <- make_announce_data(user, object, activity_id, public),
357 {:ok, activity} <- insert(announce_data, local),
358 {:ok, object} <- add_announce_to_object(activity, object),
359 :ok <- maybe_federate(activity) do
360 {:ok, activity, object}
361 else
362 error -> {:error, error}
363 end
364 end
365
366 def unannounce(
367 %User{} = actor,
368 %Object{} = object,
369 activity_id \\ nil,
370 local \\ true
371 ) do
372 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
373 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
374 {:ok, unannounce_activity} <- insert(unannounce_data, local),
375 :ok <- maybe_federate(unannounce_activity),
376 {:ok, _activity} <- Repo.delete(announce_activity),
377 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
378 {:ok, unannounce_activity, object}
379 else
380 _e -> {:ok, object}
381 end
382 end
383
384 def follow(follower, followed, activity_id \\ nil, local \\ true) do
385 with data <- make_follow_data(follower, followed, activity_id),
386 {:ok, activity} <- insert(data, local),
387 :ok <- maybe_federate(activity),
388 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
389 {:ok, activity}
390 end
391 end
392
393 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
394 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
395 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
396 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
397 {:ok, activity} <- insert(unfollow_data, local),
398 :ok <- maybe_federate(activity) do
399 {:ok, activity}
400 end
401 end
402
403 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
404 with data <- %{
405 "to" => [follower_address],
406 "type" => "Delete",
407 "actor" => ap_id,
408 "object" => %{"type" => "Person", "id" => ap_id}
409 },
410 {:ok, activity} <- insert(data, true, true, true),
411 :ok <- maybe_federate(activity) do
412 {:ok, user}
413 end
414 end
415
416 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
417 local = Keyword.get(options, :local, true)
418 activity_id = Keyword.get(options, :activity_id, nil)
419 actor = Keyword.get(options, :actor, actor)
420
421 user = User.get_cached_by_ap_id(actor)
422 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
423
424 with {:ok, object, activity} <- Object.delete(object),
425 data <-
426 %{
427 "type" => "Delete",
428 "actor" => actor,
429 "object" => id,
430 "to" => to,
431 "deleted_activity_id" => activity && activity.id
432 }
433 |> maybe_put("id", activity_id),
434 {:ok, activity} <- insert(data, local, false),
435 stream_out_participations(object, user),
436 _ <- decrease_replies_count_if_reply(object),
437 # Changing note count prior to enqueuing federation task in order to avoid
438 # race conditions on updating user.info
439 {:ok, _actor} <- decrease_note_count_if_public(user, object),
440 :ok <- maybe_federate(activity) do
441 {:ok, activity}
442 end
443 end
444
445 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
446 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
447 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
448 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
449
450 if unfollow_blocked do
451 follow_activity = fetch_latest_follow(blocker, blocked)
452 if follow_activity, do: unfollow(blocker, blocked, nil, local)
453 end
454
455 with true <- outgoing_blocks,
456 block_data <- make_block_data(blocker, blocked, activity_id),
457 {:ok, activity} <- insert(block_data, local),
458 :ok <- maybe_federate(activity) do
459 {:ok, activity}
460 else
461 _e -> {:ok, nil}
462 end
463 end
464
465 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
466 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
467 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
468 {:ok, activity} <- insert(unblock_data, local),
469 :ok <- maybe_federate(activity) do
470 {:ok, activity}
471 end
472 end
473
474 @spec flag(map()) :: {:ok, Activity.t()} | any
475 def flag(
476 %{
477 actor: actor,
478 context: _context,
479 account: account,
480 statuses: statuses,
481 content: content
482 } = params
483 ) do
484 # only accept false as false value
485 local = !(params[:local] == false)
486 forward = !(params[:forward] == false)
487
488 additional = params[:additional] || %{}
489
490 additional =
491 if forward do
492 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
493 else
494 Map.merge(additional, %{"to" => [], "cc" => []})
495 end
496
497 with flag_data <- make_flag_data(params, additional),
498 {:ok, activity} <- insert(flag_data, local),
499 :ok <- maybe_federate(activity) do
500 Enum.each(User.all_superusers(), fn superuser ->
501 superuser
502 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
503 |> Pleroma.Emails.Mailer.deliver_async()
504 end)
505
506 {:ok, activity}
507 end
508 end
509
510 defp fetch_activities_for_context_query(context, opts) do
511 public = [Pleroma.Constants.as_public()]
512
513 recipients =
514 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
515
516 from(activity in Activity)
517 |> maybe_preload_objects(opts)
518 |> maybe_preload_bookmarks(opts)
519 |> maybe_set_thread_muted_field(opts)
520 |> restrict_blocked(opts)
521 |> restrict_recipients(recipients, opts["user"])
522 |> where(
523 [activity],
524 fragment(
525 "?->>'type' = ? and ?->>'context' = ?",
526 activity.data,
527 "Create",
528 activity.data,
529 ^context
530 )
531 )
532 |> exclude_poll_votes(opts)
533 |> exclude_id(opts)
534 |> order_by([activity], desc: activity.id)
535 end
536
537 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
538 def fetch_activities_for_context(context, opts \\ %{}) do
539 context
540 |> fetch_activities_for_context_query(opts)
541 |> Repo.all()
542 end
543
544 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
545 FlakeId.Ecto.CompatType.t() | nil
546 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
547 context
548 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
549 |> limit(1)
550 |> select([a], a.id)
551 |> Repo.one()
552 end
553
554 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
555 opts = Map.drop(opts, ["user"])
556
557 [Pleroma.Constants.as_public()]
558 |> fetch_activities_query(opts)
559 |> restrict_unlisted()
560 |> Pagination.fetch_paginated(opts, pagination)
561 |> Enum.reverse()
562 end
563
564 @valid_visibilities ~w[direct unlisted public private]
565
566 defp restrict_visibility(query, %{visibility: visibility})
567 when is_list(visibility) do
568 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
569 query =
570 from(
571 a in query,
572 where:
573 fragment(
574 "activity_visibility(?, ?, ?) = ANY (?)",
575 a.actor,
576 a.recipients,
577 a.data,
578 ^visibility
579 )
580 )
581
582 query
583 else
584 Logger.error("Could not restrict visibility to #{visibility}")
585 end
586 end
587
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
590 from(
591 a in query,
592 where:
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
594 )
595 end
596
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
600 end
601
602 defp restrict_visibility(query, _visibility), do: query
603
604 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
605 when is_list(visibility) do
606 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ANY (?)",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 else
619 Logger.error("Could not exclude visibility to #{visibility}")
620 query
621 end
622 end
623
624 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
625 when visibility in @valid_visibilities do
626 from(
627 a in query,
628 where:
629 not fragment(
630 "activity_visibility(?, ?, ?) = ?",
631 a.actor,
632 a.recipients,
633 a.data,
634 ^visibility
635 )
636 )
637 end
638
639 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
640 when visibility not in @valid_visibilities do
641 Logger.error("Could not exclude visibility to #{visibility}")
642 query
643 end
644
645 defp exclude_visibility(query, _visibility), do: query
646
647 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
648 do: query
649
650 defp restrict_thread_visibility(
651 query,
652 %{"user" => %User{info: %{skip_thread_containment: true}}},
653 _
654 ),
655 do: query
656
657 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
658 from(
659 a in query,
660 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
661 )
662 end
663
664 defp restrict_thread_visibility(query, _, _), do: query
665
666 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
667 params =
668 params
669 |> Map.put("user", reading_user)
670 |> Map.put("actor_id", user.ap_id)
671 |> Map.put("whole_db", true)
672
673 recipients =
674 user_activities_recipients(%{
675 "godmode" => params["godmode"],
676 "reading_user" => reading_user
677 })
678
679 fetch_activities(recipients, params)
680 |> Enum.reverse()
681 end
682
683 def fetch_user_activities(user, reading_user, params \\ %{}) do
684 params =
685 params
686 |> Map.put("type", ["Create", "Announce"])
687 |> Map.put("user", reading_user)
688 |> Map.put("actor_id", user.ap_id)
689 |> Map.put("whole_db", true)
690 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
691
692 recipients =
693 user_activities_recipients(%{
694 "godmode" => params["godmode"],
695 "reading_user" => reading_user
696 })
697
698 fetch_activities(recipients, params)
699 |> Enum.reverse()
700 end
701
702 defp user_activities_recipients(%{"godmode" => true}) do
703 []
704 end
705
706 defp user_activities_recipients(%{"reading_user" => reading_user}) do
707 if reading_user do
708 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
709 else
710 [Pleroma.Constants.as_public()]
711 end
712 end
713
714 defp restrict_since(query, %{"since_id" => ""}), do: query
715
716 defp restrict_since(query, %{"since_id" => since_id}) do
717 from(activity in query, where: activity.id > ^since_id)
718 end
719
720 defp restrict_since(query, _), do: query
721
722 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
723 raise "Can't use the child object without preloading!"
724 end
725
726 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
727 when is_list(tag_reject) and tag_reject != [] do
728 from(
729 [_activity, object] in query,
730 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
731 )
732 end
733
734 defp restrict_tag_reject(query, _), do: query
735
736 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
737 raise "Can't use the child object without preloading!"
738 end
739
740 defp restrict_tag_all(query, %{"tag_all" => tag_all})
741 when is_list(tag_all) and tag_all != [] do
742 from(
743 [_activity, object] in query,
744 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
745 )
746 end
747
748 defp restrict_tag_all(query, _), do: query
749
750 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
751 raise "Can't use the child object without preloading!"
752 end
753
754 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
755 from(
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
758 )
759 end
760
761 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
762 from(
763 [_activity, object] in query,
764 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
765 )
766 end
767
768 defp restrict_tag(query, _), do: query
769
770 defp restrict_recipients(query, [], _user), do: query
771
772 defp restrict_recipients(query, recipients, nil) do
773 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
774 end
775
776 defp restrict_recipients(query, recipients, user) do
777 from(
778 activity in query,
779 where: fragment("? && ?", ^recipients, activity.recipients),
780 or_where: activity.actor == ^user.ap_id
781 )
782 end
783
784 defp restrict_local(query, %{"local_only" => true}) do
785 from(activity in query, where: activity.local == true)
786 end
787
788 defp restrict_local(query, _), do: query
789
790 defp restrict_actor(query, %{"actor_id" => actor_id}) do
791 from(activity in query, where: activity.actor == ^actor_id)
792 end
793
794 defp restrict_actor(query, _), do: query
795
796 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
797 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
798 end
799
800 defp restrict_type(query, %{"type" => type}) do
801 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
802 end
803
804 defp restrict_type(query, _), do: query
805
806 defp restrict_state(query, %{"state" => state}) do
807 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
808 end
809
810 defp restrict_state(query, _), do: query
811
812 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
813 from(
814 [_activity, object] in query,
815 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
816 )
817 end
818
819 defp restrict_favorited_by(query, _), do: query
820
821 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
822 raise "Can't use the child object without preloading!"
823 end
824
825 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
826 from(
827 [_activity, object] in query,
828 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
829 )
830 end
831
832 defp restrict_media(query, _), do: query
833
834 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
835 from(
836 [_activity, object] in query,
837 where: fragment("?->>'inReplyTo' is null", object.data)
838 )
839 end
840
841 defp restrict_replies(query, _), do: query
842
843 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
844 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
845 end
846
847 defp restrict_reblogs(query, _), do: query
848
849 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
850
851 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
852 mutes = info.mutes
853
854 query =
855 from([activity] in query,
856 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
857 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
858 )
859
860 unless opts["skip_preload"] do
861 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
862 else
863 query
864 end
865 end
866
867 defp restrict_muted(query, _), do: query
868
869 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
870 blocks = info.blocks || []
871 domain_blocks = info.domain_blocks || []
872
873 query =
874 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
875
876 from(
877 [activity, object: o] in query,
878 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
879 where: fragment("not (? && ?)", activity.recipients, ^blocks),
880 where:
881 fragment(
882 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
883 activity.data,
884 activity.data,
885 ^blocks
886 ),
887 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
888 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
889 )
890 end
891
892 defp restrict_blocked(query, _), do: query
893
894 defp restrict_unlisted(query) do
895 from(
896 activity in query,
897 where:
898 fragment(
899 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
900 activity.data,
901 ^[Pleroma.Constants.as_public()]
902 )
903 )
904 end
905
906 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
907 from(activity in query, where: activity.id in ^ids)
908 end
909
910 defp restrict_pinned(query, _), do: query
911
912 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
913 muted_reblogs = info.muted_reblogs || []
914
915 from(
916 activity in query,
917 where:
918 fragment(
919 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
920 activity.data,
921 activity.actor,
922 ^muted_reblogs
923 )
924 )
925 end
926
927 defp restrict_muted_reblogs(query, _), do: query
928
929 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
930
931 defp exclude_poll_votes(query, _) do
932 if has_named_binding?(query, :object) do
933 from([activity, object: o] in query,
934 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
935 )
936 else
937 query
938 end
939 end
940
941 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
942 from(activity in query, where: activity.id != ^id)
943 end
944
945 defp exclude_id(query, _), do: query
946
947 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
948
949 defp maybe_preload_objects(query, _) do
950 query
951 |> Activity.with_preloaded_object()
952 end
953
954 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
955
956 defp maybe_preload_bookmarks(query, opts) do
957 query
958 |> Activity.with_preloaded_bookmark(opts["user"])
959 end
960
961 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
962
963 defp maybe_set_thread_muted_field(query, opts) do
964 query
965 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
966 end
967
968 defp maybe_order(query, %{order: :desc}) do
969 query
970 |> order_by(desc: :id)
971 end
972
973 defp maybe_order(query, %{order: :asc}) do
974 query
975 |> order_by(asc: :id)
976 end
977
978 defp maybe_order(query, _), do: query
979
980 def fetch_activities_query(recipients, opts \\ %{}) do
981 config = %{
982 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
983 }
984
985 Activity
986 |> maybe_preload_objects(opts)
987 |> maybe_preload_bookmarks(opts)
988 |> maybe_set_thread_muted_field(opts)
989 |> maybe_order(opts)
990 |> restrict_recipients(recipients, opts["user"])
991 |> restrict_tag(opts)
992 |> restrict_tag_reject(opts)
993 |> restrict_tag_all(opts)
994 |> restrict_since(opts)
995 |> restrict_local(opts)
996 |> restrict_actor(opts)
997 |> restrict_type(opts)
998 |> restrict_state(opts)
999 |> restrict_favorited_by(opts)
1000 |> restrict_blocked(opts)
1001 |> restrict_muted(opts)
1002 |> restrict_media(opts)
1003 |> restrict_visibility(opts)
1004 |> restrict_thread_visibility(opts, config)
1005 |> restrict_replies(opts)
1006 |> restrict_reblogs(opts)
1007 |> restrict_pinned(opts)
1008 |> restrict_muted_reblogs(opts)
1009 |> Activity.restrict_deactivated_users()
1010 |> exclude_poll_votes(opts)
1011 |> exclude_visibility(opts)
1012 end
1013
1014 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1015 list_memberships = Pleroma.List.memberships(opts["user"])
1016
1017 fetch_activities_query(recipients ++ list_memberships, opts)
1018 |> Pagination.fetch_paginated(opts, pagination)
1019 |> Enum.reverse()
1020 |> maybe_update_cc(list_memberships, opts["user"])
1021 end
1022
1023 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1024 when is_list(list_memberships) and length(list_memberships) > 0 do
1025 Enum.map(activities, fn
1026 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1027 if Enum.any?(bcc, &(&1 in list_memberships)) do
1028 update_in(activity.data["cc"], &[user_ap_id | &1])
1029 else
1030 activity
1031 end
1032
1033 activity ->
1034 activity
1035 end)
1036 end
1037
1038 defp maybe_update_cc(activities, _, _), do: activities
1039
1040 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1041 from(activity in query,
1042 where:
1043 fragment("? && ?", activity.recipients, ^recipients) or
1044 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1045 ^Pleroma.Constants.as_public() in activity.recipients)
1046 )
1047 end
1048
1049 def fetch_activities_bounded(
1050 recipients,
1051 recipients_with_public,
1052 opts \\ %{},
1053 pagination \\ :keyset
1054 ) do
1055 fetch_activities_query([], opts)
1056 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1057 |> Pagination.fetch_paginated(opts, pagination)
1058 |> Enum.reverse()
1059 end
1060
1061 def upload(file, opts \\ []) do
1062 with {:ok, data} <- Upload.store(file, opts) do
1063 obj_data =
1064 if opts[:actor] do
1065 Map.put(data, "actor", opts[:actor])
1066 else
1067 data
1068 end
1069
1070 Repo.insert(%Object{data: obj_data})
1071 end
1072 end
1073
1074 defp object_to_user_data(data) do
1075 avatar =
1076 data["icon"]["url"] &&
1077 %{
1078 "type" => "Image",
1079 "url" => [%{"href" => data["icon"]["url"]}]
1080 }
1081
1082 banner =
1083 data["image"]["url"] &&
1084 %{
1085 "type" => "Image",
1086 "url" => [%{"href" => data["image"]["url"]}]
1087 }
1088
1089 fields =
1090 data
1091 |> Map.get("attachment", [])
1092 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1093 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1094
1095 locked = data["manuallyApprovesFollowers"] || false
1096 data = Transmogrifier.maybe_fix_user_object(data)
1097 discoverable = data["discoverable"] || false
1098
1099 user_data = %{
1100 ap_id: data["id"],
1101 info: %{
1102 ap_enabled: true,
1103 source_data: data,
1104 banner: banner,
1105 fields: fields,
1106 locked: locked,
1107 discoverable: discoverable
1108 },
1109 avatar: avatar,
1110 name: data["name"],
1111 follower_address: data["followers"],
1112 following_address: data["following"],
1113 bio: data["summary"]
1114 }
1115
1116 # nickname can be nil because of virtual actors
1117 user_data =
1118 if data["preferredUsername"] do
1119 Map.put(
1120 user_data,
1121 :nickname,
1122 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1123 )
1124 else
1125 Map.put(user_data, :nickname, nil)
1126 end
1127
1128 {:ok, user_data}
1129 end
1130
1131 def fetch_follow_information_for_user(user) do
1132 with {:ok, following_data} <-
1133 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1134 following_count when is_integer(following_count) <- following_data["totalItems"],
1135 {:ok, hide_follows} <- collection_private(following_data),
1136 {:ok, followers_data} <-
1137 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1138 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1139 {:ok, hide_followers} <- collection_private(followers_data) do
1140 {:ok,
1141 %{
1142 hide_follows: hide_follows,
1143 follower_count: followers_count,
1144 following_count: following_count,
1145 hide_followers: hide_followers
1146 }}
1147 else
1148 {:error, _} = e ->
1149 e
1150
1151 e ->
1152 {:error, e}
1153 end
1154 end
1155
1156 defp maybe_update_follow_information(data) do
1157 with {:enabled, true} <-
1158 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1159 {:ok, info} <- fetch_follow_information_for_user(data) do
1160 info = Map.merge(data.info, info)
1161 Map.put(data, :info, info)
1162 else
1163 {:enabled, false} ->
1164 data
1165
1166 e ->
1167 Logger.error(
1168 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1169 )
1170
1171 data
1172 end
1173 end
1174
1175 defp collection_private(data) do
1176 if is_map(data["first"]) and
1177 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1178 {:ok, false}
1179 else
1180 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1181 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1182 {:ok, false}
1183 else
1184 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1185 {:ok, true}
1186
1187 {:error, _} = e ->
1188 e
1189
1190 e ->
1191 {:error, e}
1192 end
1193 end
1194 end
1195
1196 def user_data_from_user_object(data) do
1197 with {:ok, data} <- MRF.filter(data),
1198 {:ok, data} <- object_to_user_data(data) do
1199 {:ok, data}
1200 else
1201 e -> {:error, e}
1202 end
1203 end
1204
1205 def fetch_and_prepare_user_from_ap_id(ap_id) do
1206 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1207 {:ok, data} <- user_data_from_user_object(data),
1208 data <- maybe_update_follow_information(data) do
1209 {:ok, data}
1210 else
1211 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1212 end
1213 end
1214
1215 def make_user_from_ap_id(ap_id) do
1216 if _user = User.get_cached_by_ap_id(ap_id) do
1217 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1218 else
1219 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1220 User.insert_or_update_user(data)
1221 else
1222 e -> {:error, e}
1223 end
1224 end
1225 end
1226
1227 def make_user_from_nickname(nickname) do
1228 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1229 make_user_from_ap_id(ap_id)
1230 else
1231 _e -> {:error, "No AP id in WebFinger"}
1232 end
1233 end
1234
1235 # filter out broken threads
1236 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1237 entire_thread_visible_for_user?(activity, user)
1238 end
1239
1240 # do post-processing on a specific activity
1241 def contain_activity(%Activity{} = activity, %User{} = user) do
1242 contain_broken_threads(activity, user)
1243 end
1244
1245 def fetch_direct_messages_query do
1246 Activity
1247 |> restrict_type(%{"type" => "Create"})
1248 |> restrict_visibility(%{visibility: "direct"})
1249 |> order_by([activity], asc: activity.id)
1250 end
1251 end