Merge branch 'features/private-reblogs' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.Object.Containment
13 alias Pleroma.Object.Fetcher
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.Upload
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.MRF
19 alias Pleroma.Web.ActivityPub.Transmogrifier
20 alias Pleroma.Web.Streamer
21 alias Pleroma.Web.WebFinger
22 alias Pleroma.Workers.BackgroundWorker
23
24 import Ecto.Query
25 import Pleroma.Web.ActivityPub.Utils
26 import Pleroma.Web.ActivityPub.Visibility
27
28 require Logger
29 require Pleroma.Constants
30
31 # For Announce activities, we filter the recipients based on following status for any actors
32 # that match actual users. See issue #164 for more information about why this is necessary.
33 defp get_recipients(%{"type" => "Announce"} = data) do
34 to = Map.get(data, "to", [])
35 cc = Map.get(data, "cc", [])
36 bcc = Map.get(data, "bcc", [])
37 actor = User.get_cached_by_ap_id(data["actor"])
38
39 recipients =
40 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
41 case User.get_cached_by_ap_id(recipient) do
42 nil -> true
43 user -> User.following?(user, actor)
44 end
45 end)
46
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(%{"type" => "Create"} = data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 actor = Map.get(data, "actor", [])
55 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
56 {recipients, to, cc}
57 end
58
59 defp get_recipients(data) do
60 to = Map.get(data, "to", [])
61 cc = Map.get(data, "cc", [])
62 bcc = Map.get(data, "bcc", [])
63 recipients = Enum.concat([to, cc, bcc])
64 {recipients, to, cc}
65 end
66
67 defp check_actor_is_active(actor) do
68 if not is_nil(actor) do
69 with user <- User.get_cached_by_ap_id(actor),
70 false <- user.info.deactivated do
71 true
72 else
73 _e -> false
74 end
75 else
76 true
77 end
78 end
79
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
83 end
84
85 defp check_remote_limit(_), do: true
86
87 def increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
89 end
90
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
93 end
94
95 def increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
97 "type" => "Create"
98 }) do
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
101 end
102 end
103
104 def increase_replies_count_if_reply(_create_data), do: :noop
105
106 def decrease_replies_count_if_reply(%Object{
107 data: %{"inReplyTo" => reply_ap_id} = object
108 }) do
109 if is_public?(object) do
110 Object.decrease_replies_count(reply_ap_id)
111 end
112 end
113
114 def decrease_replies_count_if_reply(_object), do: :noop
115
116 def increase_poll_votes_if_vote(%{
117 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
118 "type" => "Create"
119 }) do
120 Object.increase_vote_count(reply_ap_id, name)
121 end
122
123 def increase_poll_votes_if_vote(_create_data), do: :noop
124
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
129 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 :ok <- Containment.contain_child(map),
134 {:ok, map, object} <- insert_full_object(map) do
135 {:ok, activity} =
136 Repo.insert(%Activity{
137 data: map,
138 local: local,
139 actor: map["actor"],
140 recipients: recipients
141 })
142
143 # Splice in the child object if we have one.
144 activity =
145 if not is_nil(object) do
146 Map.put(activity, :object, object)
147 else
148 activity
149 end
150
151 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
152
153 Notification.create_notifications(activity)
154
155 participations =
156 activity
157 |> Conversation.create_or_bump_for()
158 |> get_participations()
159
160 stream_out(activity)
161 stream_out_participations(participations)
162 {:ok, activity}
163 else
164 %Activity{} = activity ->
165 {:ok, activity}
166
167 {:fake, true, map, recipients} ->
168 activity = %Activity{
169 data: map,
170 local: local,
171 actor: map["actor"],
172 recipients: recipients,
173 id: "pleroma:fakeid"
174 }
175
176 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
177 {:ok, activity}
178
179 error ->
180 {:error, error}
181 end
182 end
183
184 defp get_participations({:ok, %{participations: participations}}), do: participations
185 defp get_participations(_), do: []
186
187 def stream_out_participations(participations) do
188 participations =
189 participations
190 |> Repo.preload(:user)
191
192 Streamer.stream("participation", participations)
193 end
194
195 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
196 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
197 conversation = Repo.preload(conversation, :participations),
198 last_activity_id =
199 fetch_latest_activity_id_for_context(conversation.ap_id, %{
200 "user" => user,
201 "blocking_user" => user
202 }) do
203 if last_activity_id do
204 stream_out_participations(conversation.participations)
205 end
206 end
207 end
208
209 def stream_out_participations(_, _), do: :noop
210
211 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
212 when data_type in ["Create", "Announce", "Delete"] do
213 activity
214 |> Topics.get_activity_topics()
215 |> Streamer.stream(activity)
216 end
217
218 def stream_out(_activity) do
219 :noop
220 end
221
222 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
223 additional = params[:additional] || %{}
224 # only accept false as false value
225 local = !(params[:local] == false)
226 published = params[:published]
227
228 with create_data <-
229 make_create_data(
230 %{to: to, actor: actor, published: published, context: context, object: object},
231 additional
232 ),
233 {:ok, activity} <- insert(create_data, local, fake),
234 {:fake, false, activity} <- {:fake, fake, activity},
235 _ <- increase_replies_count_if_reply(create_data),
236 _ <- increase_poll_votes_if_vote(create_data),
237 # Changing note count prior to enqueuing federation task in order to avoid
238 # race conditions on updating user.info
239 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
240 :ok <- maybe_federate(activity) do
241 {:ok, activity}
242 else
243 {:fake, true, activity} ->
244 {:ok, activity}
245
246 {:error, message} ->
247 {:error, message}
248 end
249 end
250
251 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
252 additional = params[:additional] || %{}
253 # only accept false as false value
254 local = !(params[:local] == false)
255 published = params[:published]
256
257 with listen_data <-
258 make_listen_data(
259 %{to: to, actor: actor, published: published, context: context, object: object},
260 additional
261 ),
262 {:ok, activity} <- insert(listen_data, local),
263 :ok <- maybe_federate(activity) do
264 {:ok, activity}
265 else
266 {:error, message} ->
267 {:error, message}
268 end
269 end
270
271 def accept(%{to: to, actor: actor, object: object} = params) do
272 # only accept false as false value
273 local = !(params[:local] == false)
274
275 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
276 {:ok, activity} <- insert(data, local),
277 :ok <- maybe_federate(activity) do
278 {:ok, activity}
279 end
280 end
281
282 def reject(%{to: to, actor: actor, object: object} = params) do
283 # only accept false as false value
284 local = !(params[:local] == false)
285
286 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
287 {:ok, activity} <- insert(data, local),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 end
291 end
292
293 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
294 # only accept false as false value
295 local = !(params[:local] == false)
296
297 with data <- %{
298 "to" => to,
299 "cc" => cc,
300 "type" => "Update",
301 "actor" => actor,
302 "object" => object
303 },
304 {:ok, activity} <- insert(data, local),
305 :ok <- maybe_federate(activity) do
306 {:ok, activity}
307 end
308 end
309
310 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
311 def like(
312 %User{ap_id: ap_id} = user,
313 %Object{data: %{"id" => _}} = object,
314 activity_id \\ nil,
315 local \\ true
316 ) do
317 with nil <- get_existing_like(ap_id, object),
318 like_data <- make_like_data(user, object, activity_id),
319 {:ok, activity} <- insert(like_data, local),
320 {:ok, object} <- add_like_to_object(activity, object),
321 :ok <- maybe_federate(activity) do
322 {:ok, activity, object}
323 else
324 %Activity{} = activity -> {:ok, activity, object}
325 error -> {:error, error}
326 end
327 end
328
329 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
330 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
331 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
332 {:ok, unlike_activity} <- insert(unlike_data, local),
333 {:ok, _activity} <- Repo.delete(like_activity),
334 {:ok, object} <- remove_like_from_object(like_activity, object),
335 :ok <- maybe_federate(unlike_activity) do
336 {:ok, unlike_activity, like_activity, object}
337 else
338 _e -> {:ok, object}
339 end
340 end
341
342 def announce(
343 %User{ap_id: _} = user,
344 %Object{data: %{"id" => _}} = object,
345 activity_id \\ nil,
346 local \\ true,
347 public \\ true
348 ) do
349 with true <- is_announceable?(object, user, public),
350 announce_data <- make_announce_data(user, object, activity_id, public),
351 {:ok, activity} <- insert(announce_data, local),
352 {:ok, object} <- add_announce_to_object(activity, object),
353 :ok <- maybe_federate(activity) do
354 {:ok, activity, object}
355 else
356 error -> {:error, error}
357 end
358 end
359
360 def unannounce(
361 %User{} = actor,
362 %Object{} = object,
363 activity_id \\ nil,
364 local \\ true
365 ) do
366 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
367 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
368 {:ok, unannounce_activity} <- insert(unannounce_data, local),
369 :ok <- maybe_federate(unannounce_activity),
370 {:ok, _activity} <- Repo.delete(announce_activity),
371 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
372 {:ok, unannounce_activity, object}
373 else
374 _e -> {:ok, object}
375 end
376 end
377
378 def follow(follower, followed, activity_id \\ nil, local \\ true) do
379 with data <- make_follow_data(follower, followed, activity_id),
380 {:ok, activity} <- insert(data, local),
381 :ok <- maybe_federate(activity),
382 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
383 {:ok, activity}
384 end
385 end
386
387 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
388 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
389 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
390 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
391 {:ok, activity} <- insert(unfollow_data, local),
392 :ok <- maybe_federate(activity) do
393 {:ok, activity}
394 end
395 end
396
397 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
398 with data <- %{
399 "to" => [follower_address],
400 "type" => "Delete",
401 "actor" => ap_id,
402 "object" => %{"type" => "Person", "id" => ap_id}
403 },
404 {:ok, activity} <- insert(data, true, true, true),
405 :ok <- maybe_federate(activity) do
406 {:ok, user}
407 end
408 end
409
410 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
411 user = User.get_cached_by_ap_id(actor)
412 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
413
414 with {:ok, object, activity} <- Object.delete(object),
415 data <- %{
416 "type" => "Delete",
417 "actor" => actor,
418 "object" => id,
419 "to" => to,
420 "deleted_activity_id" => activity && activity.id
421 },
422 {:ok, activity} <- insert(data, local, false),
423 stream_out_participations(object, user),
424 _ <- decrease_replies_count_if_reply(object),
425 # Changing note count prior to enqueuing federation task in order to avoid
426 # race conditions on updating user.info
427 {:ok, _actor} <- decrease_note_count_if_public(user, object),
428 :ok <- maybe_federate(activity) do
429 {:ok, activity}
430 end
431 end
432
433 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
434 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
435 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
436 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
437
438 if unfollow_blocked do
439 follow_activity = fetch_latest_follow(blocker, blocked)
440 if follow_activity, do: unfollow(blocker, blocked, nil, local)
441 end
442
443 with true <- outgoing_blocks,
444 block_data <- make_block_data(blocker, blocked, activity_id),
445 {:ok, activity} <- insert(block_data, local),
446 :ok <- maybe_federate(activity) do
447 {:ok, activity}
448 else
449 _e -> {:ok, nil}
450 end
451 end
452
453 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
454 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
455 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
456 {:ok, activity} <- insert(unblock_data, local),
457 :ok <- maybe_federate(activity) do
458 {:ok, activity}
459 end
460 end
461
462 @spec flag(map()) :: {:ok, Activity.t()} | any
463 def flag(
464 %{
465 actor: actor,
466 context: _context,
467 account: account,
468 statuses: statuses,
469 content: content
470 } = params
471 ) do
472 # only accept false as false value
473 local = !(params[:local] == false)
474 forward = !(params[:forward] == false)
475
476 additional = params[:additional] || %{}
477
478 additional =
479 if forward do
480 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
481 else
482 Map.merge(additional, %{"to" => [], "cc" => []})
483 end
484
485 with flag_data <- make_flag_data(params, additional),
486 {:ok, activity} <- insert(flag_data, local),
487 :ok <- maybe_federate(activity) do
488 Enum.each(User.all_superusers(), fn superuser ->
489 superuser
490 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
491 |> Pleroma.Emails.Mailer.deliver_async()
492 end)
493
494 {:ok, activity}
495 end
496 end
497
498 defp fetch_activities_for_context_query(context, opts) do
499 public = [Pleroma.Constants.as_public()]
500
501 recipients =
502 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
503
504 from(activity in Activity)
505 |> maybe_preload_objects(opts)
506 |> maybe_preload_bookmarks(opts)
507 |> maybe_set_thread_muted_field(opts)
508 |> restrict_blocked(opts)
509 |> restrict_recipients(recipients, opts["user"])
510 |> where(
511 [activity],
512 fragment(
513 "?->>'type' = ? and ?->>'context' = ?",
514 activity.data,
515 "Create",
516 activity.data,
517 ^context
518 )
519 )
520 |> exclude_poll_votes(opts)
521 |> exclude_id(opts)
522 |> order_by([activity], desc: activity.id)
523 end
524
525 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
526 def fetch_activities_for_context(context, opts \\ %{}) do
527 context
528 |> fetch_activities_for_context_query(opts)
529 |> Repo.all()
530 end
531
532 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
533 FlakeId.Ecto.CompatType.t() | nil
534 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
535 context
536 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
537 |> limit(1)
538 |> select([a], a.id)
539 |> Repo.one()
540 end
541
542 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts = Map.drop(opts, ["user"])
544
545 [Pleroma.Constants.as_public()]
546 |> fetch_activities_query(opts)
547 |> restrict_unlisted()
548 |> Pagination.fetch_paginated(opts, pagination)
549 |> Enum.reverse()
550 end
551
552 @valid_visibilities ~w[direct unlisted public private]
553
554 defp restrict_visibility(query, %{visibility: visibility})
555 when is_list(visibility) do
556 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
557 query =
558 from(
559 a in query,
560 where:
561 fragment(
562 "activity_visibility(?, ?, ?) = ANY (?)",
563 a.actor,
564 a.recipients,
565 a.data,
566 ^visibility
567 )
568 )
569
570 query
571 else
572 Logger.error("Could not restrict visibility to #{visibility}")
573 end
574 end
575
576 defp restrict_visibility(query, %{visibility: visibility})
577 when visibility in @valid_visibilities do
578 from(
579 a in query,
580 where:
581 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
582 )
583 end
584
585 defp restrict_visibility(_query, %{visibility: visibility})
586 when visibility not in @valid_visibilities do
587 Logger.error("Could not restrict visibility to #{visibility}")
588 end
589
590 defp restrict_visibility(query, _visibility), do: query
591
592 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
593 do: query
594
595 defp restrict_thread_visibility(
596 query,
597 %{"user" => %User{info: %{skip_thread_containment: true}}},
598 _
599 ),
600 do: query
601
602 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
603 from(
604 a in query,
605 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
606 )
607 end
608
609 defp restrict_thread_visibility(query, _, _), do: query
610
611 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
612 params =
613 params
614 |> Map.put("user", reading_user)
615 |> Map.put("actor_id", user.ap_id)
616 |> Map.put("whole_db", true)
617
618 recipients =
619 user_activities_recipients(%{
620 "godmode" => params["godmode"],
621 "reading_user" => reading_user
622 })
623
624 fetch_activities(recipients, params)
625 |> Enum.reverse()
626 end
627
628 def fetch_user_activities(user, reading_user, params \\ %{}) do
629 params =
630 params
631 |> Map.put("type", ["Create", "Announce"])
632 |> Map.put("user", reading_user)
633 |> Map.put("actor_id", user.ap_id)
634 |> Map.put("whole_db", true)
635 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
636
637 recipients =
638 user_activities_recipients(%{
639 "godmode" => params["godmode"],
640 "reading_user" => reading_user
641 })
642
643 fetch_activities(recipients, params)
644 |> Enum.reverse()
645 end
646
647 defp user_activities_recipients(%{"godmode" => true}) do
648 []
649 end
650
651 defp user_activities_recipients(%{"reading_user" => reading_user}) do
652 if reading_user do
653 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
654 else
655 [Pleroma.Constants.as_public()]
656 end
657 end
658
659 defp restrict_since(query, %{"since_id" => ""}), do: query
660
661 defp restrict_since(query, %{"since_id" => since_id}) do
662 from(activity in query, where: activity.id > ^since_id)
663 end
664
665 defp restrict_since(query, _), do: query
666
667 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
672 when is_list(tag_reject) and tag_reject != [] do
673 from(
674 [_activity, object] in query,
675 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 )
677 end
678
679 defp restrict_tag_reject(query, _), do: query
680
681 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
682 raise "Can't use the child object without preloading!"
683 end
684
685 defp restrict_tag_all(query, %{"tag_all" => tag_all})
686 when is_list(tag_all) and tag_all != [] do
687 from(
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
690 )
691 end
692
693 defp restrict_tag_all(query, _), do: query
694
695 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
696 raise "Can't use the child object without preloading!"
697 end
698
699 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
700 from(
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
703 )
704 end
705
706 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
707 from(
708 [_activity, object] in query,
709 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
710 )
711 end
712
713 defp restrict_tag(query, _), do: query
714
715 defp restrict_recipients(query, [], _user), do: query
716
717 defp restrict_recipients(query, recipients, nil) do
718 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
719 end
720
721 defp restrict_recipients(query, recipients, user) do
722 from(
723 activity in query,
724 where: fragment("? && ?", ^recipients, activity.recipients),
725 or_where: activity.actor == ^user.ap_id
726 )
727 end
728
729 defp restrict_local(query, %{"local_only" => true}) do
730 from(activity in query, where: activity.local == true)
731 end
732
733 defp restrict_local(query, _), do: query
734
735 defp restrict_actor(query, %{"actor_id" => actor_id}) do
736 from(activity in query, where: activity.actor == ^actor_id)
737 end
738
739 defp restrict_actor(query, _), do: query
740
741 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
742 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
743 end
744
745 defp restrict_type(query, %{"type" => type}) do
746 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
747 end
748
749 defp restrict_type(query, _), do: query
750
751 defp restrict_state(query, %{"state" => state}) do
752 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
753 end
754
755 defp restrict_state(query, _), do: query
756
757 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
758 from(
759 [_activity, object] in query,
760 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
761 )
762 end
763
764 defp restrict_favorited_by(query, _), do: query
765
766 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
767 raise "Can't use the child object without preloading!"
768 end
769
770 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
771 from(
772 [_activity, object] in query,
773 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
774 )
775 end
776
777 defp restrict_media(query, _), do: query
778
779 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
780 from(
781 activity in query,
782 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
783 )
784 end
785
786 defp restrict_replies(query, _), do: query
787
788 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
789 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
790 end
791
792 defp restrict_reblogs(query, _), do: query
793
794 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
795
796 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
797 mutes = info.mutes
798
799 query =
800 from([activity] in query,
801 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
802 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
803 )
804
805 unless opts["skip_preload"] do
806 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
807 else
808 query
809 end
810 end
811
812 defp restrict_muted(query, _), do: query
813
814 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
815 blocks = info.blocks || []
816 domain_blocks = info.domain_blocks || []
817
818 query =
819 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
820
821 from(
822 [activity, object: o] in query,
823 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
824 where: fragment("not (? && ?)", activity.recipients, ^blocks),
825 where:
826 fragment(
827 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
828 activity.data,
829 activity.data,
830 ^blocks
831 ),
832 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
833 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
834 )
835 end
836
837 defp restrict_blocked(query, _), do: query
838
839 defp restrict_unlisted(query) do
840 from(
841 activity in query,
842 where:
843 fragment(
844 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
845 activity.data,
846 ^[Pleroma.Constants.as_public()]
847 )
848 )
849 end
850
851 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
852 from(activity in query, where: activity.id in ^ids)
853 end
854
855 defp restrict_pinned(query, _), do: query
856
857 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
858 muted_reblogs = info.muted_reblogs || []
859
860 from(
861 activity in query,
862 where:
863 fragment(
864 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
865 activity.data,
866 activity.actor,
867 ^muted_reblogs
868 )
869 )
870 end
871
872 defp restrict_muted_reblogs(query, _), do: query
873
874 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
875
876 defp exclude_poll_votes(query, _) do
877 if has_named_binding?(query, :object) do
878 from([activity, object: o] in query,
879 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
880 )
881 else
882 query
883 end
884 end
885
886 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
887 from(activity in query, where: activity.id != ^id)
888 end
889
890 defp exclude_id(query, _), do: query
891
892 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
893
894 defp maybe_preload_objects(query, _) do
895 query
896 |> Activity.with_preloaded_object()
897 end
898
899 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
900
901 defp maybe_preload_bookmarks(query, opts) do
902 query
903 |> Activity.with_preloaded_bookmark(opts["user"])
904 end
905
906 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
907
908 defp maybe_set_thread_muted_field(query, opts) do
909 query
910 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
911 end
912
913 defp maybe_order(query, %{order: :desc}) do
914 query
915 |> order_by(desc: :id)
916 end
917
918 defp maybe_order(query, %{order: :asc}) do
919 query
920 |> order_by(asc: :id)
921 end
922
923 defp maybe_order(query, _), do: query
924
925 def fetch_activities_query(recipients, opts \\ %{}) do
926 config = %{
927 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
928 }
929
930 Activity
931 |> maybe_preload_objects(opts)
932 |> maybe_preload_bookmarks(opts)
933 |> maybe_set_thread_muted_field(opts)
934 |> maybe_order(opts)
935 |> restrict_recipients(recipients, opts["user"])
936 |> restrict_tag(opts)
937 |> restrict_tag_reject(opts)
938 |> restrict_tag_all(opts)
939 |> restrict_since(opts)
940 |> restrict_local(opts)
941 |> restrict_actor(opts)
942 |> restrict_type(opts)
943 |> restrict_state(opts)
944 |> restrict_favorited_by(opts)
945 |> restrict_blocked(opts)
946 |> restrict_muted(opts)
947 |> restrict_media(opts)
948 |> restrict_visibility(opts)
949 |> restrict_thread_visibility(opts, config)
950 |> restrict_replies(opts)
951 |> restrict_reblogs(opts)
952 |> restrict_pinned(opts)
953 |> restrict_muted_reblogs(opts)
954 |> Activity.restrict_deactivated_users()
955 |> exclude_poll_votes(opts)
956 end
957
958 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
959 list_memberships = Pleroma.List.memberships(opts["user"])
960
961 fetch_activities_query(recipients ++ list_memberships, opts)
962 |> Pagination.fetch_paginated(opts, pagination)
963 |> Enum.reverse()
964 |> maybe_update_cc(list_memberships, opts["user"])
965 end
966
967 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
968 when is_list(list_memberships) and length(list_memberships) > 0 do
969 Enum.map(activities, fn
970 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
971 if Enum.any?(bcc, &(&1 in list_memberships)) do
972 update_in(activity.data["cc"], &[user_ap_id | &1])
973 else
974 activity
975 end
976
977 activity ->
978 activity
979 end)
980 end
981
982 defp maybe_update_cc(activities, _, _), do: activities
983
984 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
985 from(activity in query,
986 where:
987 fragment("? && ?", activity.recipients, ^recipients) or
988 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
989 ^Pleroma.Constants.as_public() in activity.recipients)
990 )
991 end
992
993 def fetch_activities_bounded(
994 recipients,
995 recipients_with_public,
996 opts \\ %{},
997 pagination \\ :keyset
998 ) do
999 fetch_activities_query([], opts)
1000 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1001 |> Pagination.fetch_paginated(opts, pagination)
1002 |> Enum.reverse()
1003 end
1004
1005 def upload(file, opts \\ []) do
1006 with {:ok, data} <- Upload.store(file, opts) do
1007 obj_data =
1008 if opts[:actor] do
1009 Map.put(data, "actor", opts[:actor])
1010 else
1011 data
1012 end
1013
1014 Repo.insert(%Object{data: obj_data})
1015 end
1016 end
1017
1018 defp object_to_user_data(data) do
1019 avatar =
1020 data["icon"]["url"] &&
1021 %{
1022 "type" => "Image",
1023 "url" => [%{"href" => data["icon"]["url"]}]
1024 }
1025
1026 banner =
1027 data["image"]["url"] &&
1028 %{
1029 "type" => "Image",
1030 "url" => [%{"href" => data["image"]["url"]}]
1031 }
1032
1033 fields =
1034 data
1035 |> Map.get("attachment", [])
1036 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1037 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1038
1039 locked = data["manuallyApprovesFollowers"] || false
1040 data = Transmogrifier.maybe_fix_user_object(data)
1041 discoverable = data["discoverable"] || false
1042
1043 user_data = %{
1044 ap_id: data["id"],
1045 info: %{
1046 ap_enabled: true,
1047 source_data: data,
1048 banner: banner,
1049 fields: fields,
1050 locked: locked,
1051 discoverable: discoverable
1052 },
1053 avatar: avatar,
1054 name: data["name"],
1055 follower_address: data["followers"],
1056 following_address: data["following"],
1057 bio: data["summary"]
1058 }
1059
1060 # nickname can be nil because of virtual actors
1061 user_data =
1062 if data["preferredUsername"] do
1063 Map.put(
1064 user_data,
1065 :nickname,
1066 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1067 )
1068 else
1069 Map.put(user_data, :nickname, nil)
1070 end
1071
1072 {:ok, user_data}
1073 end
1074
1075 def fetch_follow_information_for_user(user) do
1076 with {:ok, following_data} <-
1077 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1078 following_count when is_integer(following_count) <- following_data["totalItems"],
1079 {:ok, hide_follows} <- collection_private(following_data),
1080 {:ok, followers_data} <-
1081 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1082 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1083 {:ok, hide_followers} <- collection_private(followers_data) do
1084 {:ok,
1085 %{
1086 hide_follows: hide_follows,
1087 follower_count: followers_count,
1088 following_count: following_count,
1089 hide_followers: hide_followers
1090 }}
1091 else
1092 {:error, _} = e ->
1093 e
1094
1095 e ->
1096 {:error, e}
1097 end
1098 end
1099
1100 defp maybe_update_follow_information(data) do
1101 with {:enabled, true} <-
1102 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1103 {:ok, info} <- fetch_follow_information_for_user(data) do
1104 info = Map.merge(data.info, info)
1105 Map.put(data, :info, info)
1106 else
1107 {:enabled, false} ->
1108 data
1109
1110 e ->
1111 Logger.error(
1112 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1113 )
1114
1115 data
1116 end
1117 end
1118
1119 defp collection_private(data) do
1120 if is_map(data["first"]) and
1121 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1122 {:ok, false}
1123 else
1124 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1125 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1126 {:ok, false}
1127 else
1128 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1129 {:ok, true}
1130
1131 {:error, _} = e ->
1132 e
1133
1134 e ->
1135 {:error, e}
1136 end
1137 end
1138 end
1139
1140 def user_data_from_user_object(data) do
1141 with {:ok, data} <- MRF.filter(data),
1142 {:ok, data} <- object_to_user_data(data) do
1143 {:ok, data}
1144 else
1145 e -> {:error, e}
1146 end
1147 end
1148
1149 def fetch_and_prepare_user_from_ap_id(ap_id) do
1150 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1151 {:ok, data} <- user_data_from_user_object(data),
1152 data <- maybe_update_follow_information(data) do
1153 {:ok, data}
1154 else
1155 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1156 end
1157 end
1158
1159 def make_user_from_ap_id(ap_id) do
1160 if _user = User.get_cached_by_ap_id(ap_id) do
1161 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1162 else
1163 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1164 User.insert_or_update_user(data)
1165 else
1166 e -> {:error, e}
1167 end
1168 end
1169 end
1170
1171 def make_user_from_nickname(nickname) do
1172 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1173 make_user_from_ap_id(ap_id)
1174 else
1175 _e -> {:error, "No AP id in WebFinger"}
1176 end
1177 end
1178
1179 # filter out broken threads
1180 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1181 entire_thread_visible_for_user?(activity, user)
1182 end
1183
1184 # do post-processing on a specific activity
1185 def contain_activity(%Activity{} = activity, %User{} = user) do
1186 contain_broken_threads(activity, user)
1187 end
1188
1189 def fetch_direct_messages_query do
1190 Activity
1191 |> restrict_type(%{"type" => "Create"})
1192 |> restrict_visibility(%{visibility: "direct"})
1193 |> order_by([activity], asc: activity.id)
1194 end
1195 end