Merge branch 'develop' into feature/digest-email
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = Map.get(data, "to", [])
31 cc = Map.get(data, "cc", [])
32 bcc = Map.get(data, "bcc", [])
33 actor = User.get_cached_by_ap_id(data["actor"])
34
35 recipients =
36 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil -> true
39 user -> User.following?(user, actor)
40 end
41 end)
42
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(%{"type" => "Create"} = data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 actor = Map.get(data, "actor", [])
51 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
52 {recipients, to, cc}
53 end
54
55 defp get_recipients(data) do
56 to = Map.get(data, "to", [])
57 cc = Map.get(data, "cc", [])
58 bcc = Map.get(data, "bcc", [])
59 recipients = Enum.concat([to, cc, bcc])
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 :ok <- Containment.contain_child(map),
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 Repo.insert(%Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 })
138
139 # Splice in the child object if we have one.
140 activity =
141 if !is_nil(object) do
142 Map.put(activity, :object, object)
143 else
144 activity
145 end
146
147 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
148
149 Notification.create_notifications(activity)
150
151 participations =
152 activity
153 |> Conversation.create_or_bump_for()
154 |> get_participations()
155
156 stream_out(activity)
157 stream_out_participations(participations)
158 {:ok, activity}
159 else
160 %Activity{} = activity ->
161 {:ok, activity}
162
163 {:fake, true, map, recipients} ->
164 activity = %Activity{
165 data: map,
166 local: local,
167 actor: map["actor"],
168 recipients: recipients,
169 id: "pleroma:fakeid"
170 }
171
172 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:ok, activity}
174
175 error ->
176 {:error, error}
177 end
178 end
179
180 defp get_participations({:ok, %{participations: participations}}), do: participations
181 defp get_participations(_), do: []
182
183 def stream_out_participations(participations) do
184 participations =
185 participations
186 |> Repo.preload(:user)
187
188 Enum.each(participations, fn participation ->
189 Pleroma.Web.Streamer.stream("participation", participation)
190 end)
191 end
192
193 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
194 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
195 conversation = Repo.preload(conversation, :participations),
196 last_activity_id =
197 fetch_latest_activity_id_for_context(conversation.ap_id, %{
198 "user" => user,
199 "blocking_user" => user
200 }) do
201 if last_activity_id do
202 stream_out_participations(conversation.participations)
203 end
204 end
205 end
206
207 def stream_out_participations(_, _), do: :noop
208
209 def stream_out(activity) do
210 public = "https://www.w3.org/ns/activitystreams#Public"
211
212 if activity.data["type"] in ["Create", "Announce", "Delete"] do
213 object = Object.normalize(activity)
214 # Do not stream out poll replies
215 unless object.data["type"] == "Answer" do
216 Pleroma.Web.Streamer.stream("user", activity)
217 Pleroma.Web.Streamer.stream("list", activity)
218
219 if Enum.member?(activity.data["to"], public) do
220 Pleroma.Web.Streamer.stream("public", activity)
221
222 if activity.local do
223 Pleroma.Web.Streamer.stream("public:local", activity)
224 end
225
226 if activity.data["type"] in ["Create"] do
227 object.data
228 |> Map.get("tag", [])
229 |> Enum.filter(fn tag -> is_bitstring(tag) end)
230 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
231
232 if object.data["attachment"] != [] do
233 Pleroma.Web.Streamer.stream("public:media", activity)
234
235 if activity.local do
236 Pleroma.Web.Streamer.stream("public:local:media", activity)
237 end
238 end
239 end
240 else
241 # TODO: Write test, replace with visibility test
242 if !Enum.member?(activity.data["cc"] || [], public) &&
243 !Enum.member?(
244 activity.data["to"],
245 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
246 ),
247 do: Pleroma.Web.Streamer.stream("direct", activity)
248 end
249 end
250 end
251 end
252
253 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
254 additional = params[:additional] || %{}
255 # only accept false as false value
256 local = !(params[:local] == false)
257 published = params[:published]
258
259 with create_data <-
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 ),
264 {:ok, activity} <- insert(create_data, local, fake),
265 {:fake, false, activity} <- {:fake, fake, activity},
266 _ <- increase_replies_count_if_reply(create_data),
267 _ <- increase_poll_votes_if_vote(create_data),
268 # Changing note count prior to enqueuing federation task in order to avoid
269 # race conditions on updating user.info
270 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
271 :ok <- maybe_federate(activity) do
272 {:ok, activity}
273 else
274 {:fake, true, activity} ->
275 {:ok, activity}
276 end
277 end
278
279 def accept(%{to: to, actor: actor, object: object} = params) do
280 # only accept false as false value
281 local = !(params[:local] == false)
282
283 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
284 {:ok, activity} <- insert(data, local),
285 :ok <- maybe_federate(activity) do
286 {:ok, activity}
287 end
288 end
289
290 def reject(%{to: to, actor: actor, object: object} = params) do
291 # only accept false as false value
292 local = !(params[:local] == false)
293
294 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
302 # only accept false as false value
303 local = !(params[:local] == false)
304
305 with data <- %{
306 "to" => to,
307 "cc" => cc,
308 "type" => "Update",
309 "actor" => actor,
310 "object" => object
311 },
312 {:ok, activity} <- insert(data, local),
313 :ok <- maybe_federate(activity) do
314 {:ok, activity}
315 end
316 end
317
318 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
319 def like(
320 %User{ap_id: ap_id} = user,
321 %Object{data: %{"id" => _}} = object,
322 activity_id \\ nil,
323 local \\ true
324 ) do
325 with nil <- get_existing_like(ap_id, object),
326 like_data <- make_like_data(user, object, activity_id),
327 {:ok, activity} <- insert(like_data, local),
328 {:ok, object} <- add_like_to_object(activity, object),
329 :ok <- maybe_federate(activity) do
330 {:ok, activity, object}
331 else
332 %Activity{} = activity -> {:ok, activity, object}
333 error -> {:error, error}
334 end
335 end
336
337 def unlike(
338 %User{} = actor,
339 %Object{} = object,
340 activity_id \\ nil,
341 local \\ true
342 ) do
343 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
344 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
345 {:ok, unlike_activity} <- insert(unlike_data, local),
346 {:ok, _activity} <- Repo.delete(like_activity),
347 {:ok, object} <- remove_like_from_object(like_activity, object),
348 :ok <- maybe_federate(unlike_activity) do
349 {:ok, unlike_activity, like_activity, object}
350 else
351 _e -> {:ok, object}
352 end
353 end
354
355 def announce(
356 %User{ap_id: _} = user,
357 %Object{data: %{"id" => _}} = object,
358 activity_id \\ nil,
359 local \\ true,
360 public \\ true
361 ) do
362 with true <- is_public?(object),
363 announce_data <- make_announce_data(user, object, activity_id, public),
364 {:ok, activity} <- insert(announce_data, local),
365 {:ok, object} <- add_announce_to_object(activity, object),
366 :ok <- maybe_federate(activity) do
367 {:ok, activity, object}
368 else
369 error -> {:error, error}
370 end
371 end
372
373 def unannounce(
374 %User{} = actor,
375 %Object{} = object,
376 activity_id \\ nil,
377 local \\ true
378 ) do
379 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
380 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
381 {:ok, unannounce_activity} <- insert(unannounce_data, local),
382 :ok <- maybe_federate(unannounce_activity),
383 {:ok, _activity} <- Repo.delete(announce_activity),
384 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
385 {:ok, unannounce_activity, object}
386 else
387 _e -> {:ok, object}
388 end
389 end
390
391 def follow(follower, followed, activity_id \\ nil, local \\ true) do
392 with data <- make_follow_data(follower, followed, activity_id),
393 {:ok, activity} <- insert(data, local),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity}
396 end
397 end
398
399 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
400 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
401 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
402 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
403 {:ok, activity} <- insert(unfollow_data, local),
404 :ok <- maybe_federate(activity) do
405 {:ok, activity}
406 end
407 end
408
409 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
410 with data <- %{
411 "to" => [follower_address],
412 "type" => "Delete",
413 "actor" => ap_id,
414 "object" => %{"type" => "Person", "id" => ap_id}
415 },
416 {:ok, activity} <- insert(data, true, true),
417 :ok <- maybe_federate(activity) do
418 {:ok, user}
419 end
420 end
421
422 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
423 user = User.get_cached_by_ap_id(actor)
424 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
425
426 with {:ok, object, activity} <- Object.delete(object),
427 data <- %{
428 "type" => "Delete",
429 "actor" => actor,
430 "object" => id,
431 "to" => to,
432 "deleted_activity_id" => activity && activity.id
433 },
434 {:ok, activity} <- insert(data, local, false),
435 stream_out_participations(object, user),
436 _ <- decrease_replies_count_if_reply(object),
437 # Changing note count prior to enqueuing federation task in order to avoid
438 # race conditions on updating user.info
439 {:ok, _actor} <- decrease_note_count_if_public(user, object),
440 :ok <- maybe_federate(activity) do
441 {:ok, activity}
442 end
443 end
444
445 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
446 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
447 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
448
449 if unfollow_blocked do
450 follow_activity = fetch_latest_follow(blocker, blocked)
451 if follow_activity, do: unfollow(blocker, blocked, nil, local)
452 end
453
454 with true <- outgoing_blocks,
455 block_data <- make_block_data(blocker, blocked, activity_id),
456 {:ok, activity} <- insert(block_data, local),
457 :ok <- maybe_federate(activity) do
458 {:ok, activity}
459 else
460 _e -> {:ok, nil}
461 end
462 end
463
464 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
465 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
466 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
467 {:ok, activity} <- insert(unblock_data, local),
468 :ok <- maybe_federate(activity) do
469 {:ok, activity}
470 end
471 end
472
473 def flag(
474 %{
475 actor: actor,
476 context: context,
477 account: account,
478 statuses: statuses,
479 content: content
480 } = params
481 ) do
482 # only accept false as false value
483 local = !(params[:local] == false)
484 forward = !(params[:forward] == false)
485
486 additional = params[:additional] || %{}
487
488 params = %{
489 actor: actor,
490 context: context,
491 account: account,
492 statuses: statuses,
493 content: content
494 }
495
496 additional =
497 if forward do
498 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
499 else
500 Map.merge(additional, %{"to" => [], "cc" => []})
501 end
502
503 with flag_data <- make_flag_data(params, additional),
504 {:ok, activity} <- insert(flag_data, local),
505 :ok <- maybe_federate(activity) do
506 Enum.each(User.all_superusers(), fn superuser ->
507 superuser
508 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
509 |> Pleroma.Emails.Mailer.deliver_async()
510 end)
511
512 {:ok, activity}
513 end
514 end
515
516 defp fetch_activities_for_context_query(context, opts) do
517 public = ["https://www.w3.org/ns/activitystreams#Public"]
518
519 recipients =
520 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
521
522 from(activity in Activity)
523 |> maybe_preload_objects(opts)
524 |> restrict_blocked(opts)
525 |> restrict_recipients(recipients, opts["user"])
526 |> where(
527 [activity],
528 fragment(
529 "?->>'type' = ? and ?->>'context' = ?",
530 activity.data,
531 "Create",
532 activity.data,
533 ^context
534 )
535 )
536 |> exclude_poll_votes(opts)
537 |> order_by([activity], desc: activity.id)
538 end
539
540 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
541 def fetch_activities_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(opts)
544 |> Repo.all()
545 end
546
547 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
548 Pleroma.FlakeId.t() | nil
549 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
550 context
551 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
552 |> limit(1)
553 |> select([a], a.id)
554 |> Repo.one()
555 end
556
557 def fetch_public_activities(opts \\ %{}) do
558 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
559
560 q
561 |> restrict_unlisted()
562 |> Pagination.fetch_paginated(opts)
563 |> Enum.reverse()
564 end
565
566 @valid_visibilities ~w[direct unlisted public private]
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
571 query =
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583
584 query
585 else
586 Logger.error("Could not restrict visibility to #{visibility}")
587 end
588 end
589
590 defp restrict_visibility(query, %{visibility: visibility})
591 when visibility in @valid_visibilities do
592 from(
593 a in query,
594 where:
595 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
596 )
597 end
598
599 defp restrict_visibility(_query, %{visibility: visibility})
600 when visibility not in @valid_visibilities do
601 Logger.error("Could not restrict visibility to #{visibility}")
602 end
603
604 defp restrict_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(
610 query,
611 %{"user" => %User{info: %{skip_thread_containment: true}}},
612 _
613 ),
614 do: query
615
616 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
617 from(
618 a in query,
619 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
620 )
621 end
622
623 defp restrict_thread_visibility(query, _, _), do: query
624
625 def fetch_user_activities(user, reading_user, params \\ %{}) do
626 params =
627 params
628 |> Map.put("type", ["Create", "Announce"])
629 |> Map.put("actor_id", user.ap_id)
630 |> Map.put("whole_db", true)
631 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
632
633 recipients =
634 user_activities_recipients(%{
635 "godmode" => params["godmode"],
636 "reading_user" => reading_user
637 })
638
639 fetch_activities(recipients, params)
640 |> Enum.reverse()
641 end
642
643 defp user_activities_recipients(%{"godmode" => true}) do
644 []
645 end
646
647 defp user_activities_recipients(%{"reading_user" => reading_user}) do
648 if reading_user do
649 ["https://www.w3.org/ns/activitystreams#Public"] ++
650 [reading_user.ap_id | reading_user.following]
651 else
652 ["https://www.w3.org/ns/activitystreams#Public"]
653 end
654 end
655
656 defp restrict_since(query, %{"since_id" => ""}), do: query
657
658 defp restrict_since(query, %{"since_id" => since_id}) do
659 from(activity in query, where: activity.id > ^since_id)
660 end
661
662 defp restrict_since(query, _), do: query
663
664 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
665 raise "Can't use the child object without preloading!"
666 end
667
668 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
669 when is_list(tag_reject) and tag_reject != [] do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, _), do: query
677
678 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
679 raise "Can't use the child object without preloading!"
680 end
681
682 defp restrict_tag_all(query, %{"tag_all" => tag_all})
683 when is_list(tag_all) and tag_all != [] do
684 from(
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
687 )
688 end
689
690 defp restrict_tag_all(query, _), do: query
691
692 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
693 raise "Can't use the child object without preloading!"
694 end
695
696 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
697 from(
698 [_activity, object] in query,
699 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
700 )
701 end
702
703 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
707 )
708 end
709
710 defp restrict_tag(query, _), do: query
711
712 defp restrict_recipients(query, [], _user), do: query
713
714 defp restrict_recipients(query, recipients, nil) do
715 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
716 end
717
718 defp restrict_recipients(query, recipients, user) do
719 from(
720 activity in query,
721 where: fragment("? && ?", ^recipients, activity.recipients),
722 or_where: activity.actor == ^user.ap_id
723 )
724 end
725
726 defp restrict_local(query, %{"local_only" => true}) do
727 from(activity in query, where: activity.local == true)
728 end
729
730 defp restrict_local(query, _), do: query
731
732 defp restrict_actor(query, %{"actor_id" => actor_id}) do
733 from(activity in query, where: activity.actor == ^actor_id)
734 end
735
736 defp restrict_actor(query, _), do: query
737
738 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
739 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
740 end
741
742 defp restrict_type(query, %{"type" => type}) do
743 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
744 end
745
746 defp restrict_type(query, _), do: query
747
748 defp restrict_state(query, %{"state" => state}) do
749 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
750 end
751
752 defp restrict_state(query, _), do: query
753
754 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
755 from(
756 activity in query,
757 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
758 )
759 end
760
761 defp restrict_favorited_by(query, _), do: query
762
763 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
764 raise "Can't use the child object without preloading!"
765 end
766
767 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
768 from(
769 [_activity, object] in query,
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 )
772 end
773
774 defp restrict_media(query, _), do: query
775
776 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
777 from(
778 activity in query,
779 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
780 )
781 end
782
783 defp restrict_replies(query, _), do: query
784
785 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
786 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
787 end
788
789 defp restrict_reblogs(query, _), do: query
790
791 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
792
793 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
794 mutes = info.mutes
795
796 from(
797 activity in query,
798 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
799 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
800 )
801 end
802
803 defp restrict_muted(query, _), do: query
804
805 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
806 blocks = info.blocks || []
807 domain_blocks = info.domain_blocks || []
808
809 query =
810 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
811
812 from(
813 [activity, object: o] in query,
814 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
815 where: fragment("not (? && ?)", activity.recipients, ^blocks),
816 where:
817 fragment(
818 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
819 activity.data,
820 activity.data,
821 ^blocks
822 ),
823 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
824 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
825 )
826 end
827
828 defp restrict_blocked(query, _), do: query
829
830 defp restrict_unlisted(query) do
831 from(
832 activity in query,
833 where:
834 fragment(
835 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
836 activity.data,
837 ^["https://www.w3.org/ns/activitystreams#Public"]
838 )
839 )
840 end
841
842 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
843 from(activity in query, where: activity.id in ^ids)
844 end
845
846 defp restrict_pinned(query, _), do: query
847
848 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
849 muted_reblogs = info.muted_reblogs || []
850
851 from(
852 activity in query,
853 where:
854 fragment(
855 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
856 activity.data,
857 activity.actor,
858 ^muted_reblogs
859 )
860 )
861 end
862
863 defp restrict_muted_reblogs(query, _), do: query
864
865 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
866
867 defp exclude_poll_votes(query, _) do
868 if has_named_binding?(query, :object) do
869 from([activity, object: o] in query,
870 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
871 )
872 else
873 query
874 end
875 end
876
877 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
878
879 defp maybe_preload_objects(query, _) do
880 query
881 |> Activity.with_preloaded_object()
882 end
883
884 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
885
886 defp maybe_preload_bookmarks(query, opts) do
887 query
888 |> Activity.with_preloaded_bookmark(opts["user"])
889 end
890
891 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
892
893 defp maybe_set_thread_muted_field(query, opts) do
894 query
895 |> Activity.with_set_thread_muted_field(opts["user"])
896 end
897
898 defp maybe_order(query, %{order: :desc}) do
899 query
900 |> order_by(desc: :id)
901 end
902
903 defp maybe_order(query, %{order: :asc}) do
904 query
905 |> order_by(asc: :id)
906 end
907
908 defp maybe_order(query, _), do: query
909
910 def fetch_activities_query(recipients, opts \\ %{}) do
911 config = %{
912 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
913 }
914
915 Activity
916 |> maybe_preload_objects(opts)
917 |> maybe_preload_bookmarks(opts)
918 |> maybe_set_thread_muted_field(opts)
919 |> maybe_order(opts)
920 |> restrict_recipients(recipients, opts["user"])
921 |> restrict_tag(opts)
922 |> restrict_tag_reject(opts)
923 |> restrict_tag_all(opts)
924 |> restrict_since(opts)
925 |> restrict_local(opts)
926 |> restrict_actor(opts)
927 |> restrict_type(opts)
928 |> restrict_state(opts)
929 |> restrict_favorited_by(opts)
930 |> restrict_blocked(opts)
931 |> restrict_muted(opts)
932 |> restrict_media(opts)
933 |> restrict_visibility(opts)
934 |> restrict_thread_visibility(opts, config)
935 |> restrict_replies(opts)
936 |> restrict_reblogs(opts)
937 |> restrict_pinned(opts)
938 |> restrict_muted_reblogs(opts)
939 |> Activity.restrict_deactivated_users()
940 |> exclude_poll_votes(opts)
941 end
942
943 def fetch_activities(recipients, opts \\ %{}) do
944 list_memberships = Pleroma.List.memberships(opts["user"])
945
946 fetch_activities_query(recipients ++ list_memberships, opts)
947 |> Pagination.fetch_paginated(opts)
948 |> Enum.reverse()
949 |> maybe_update_cc(list_memberships, opts["user"])
950 end
951
952 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
953 when is_list(list_memberships) and length(list_memberships) > 0 do
954 Enum.map(activities, fn
955 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
956 if Enum.any?(bcc, &(&1 in list_memberships)) do
957 update_in(activity.data["cc"], &[user_ap_id | &1])
958 else
959 activity
960 end
961
962 activity ->
963 activity
964 end)
965 end
966
967 defp maybe_update_cc(activities, _, _), do: activities
968
969 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
970 from(activity in query,
971 where:
972 fragment("? && ?", activity.recipients, ^recipients) or
973 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
974 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
975 )
976 end
977
978 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
979 fetch_activities_query([], opts)
980 |> fetch_activities_bounded_query(recipients, recipients_with_public)
981 |> Pagination.fetch_paginated(opts)
982 |> Enum.reverse()
983 end
984
985 def upload(file, opts \\ []) do
986 with {:ok, data} <- Upload.store(file, opts) do
987 obj_data =
988 if opts[:actor] do
989 Map.put(data, "actor", opts[:actor])
990 else
991 data
992 end
993
994 Repo.insert(%Object{data: obj_data})
995 end
996 end
997
998 defp object_to_user_data(data) do
999 avatar =
1000 data["icon"]["url"] &&
1001 %{
1002 "type" => "Image",
1003 "url" => [%{"href" => data["icon"]["url"]}]
1004 }
1005
1006 banner =
1007 data["image"]["url"] &&
1008 %{
1009 "type" => "Image",
1010 "url" => [%{"href" => data["image"]["url"]}]
1011 }
1012
1013 locked = data["manuallyApprovesFollowers"] || false
1014 data = Transmogrifier.maybe_fix_user_object(data)
1015
1016 user_data = %{
1017 ap_id: data["id"],
1018 info: %{
1019 "ap_enabled" => true,
1020 "source_data" => data,
1021 "banner" => banner,
1022 "locked" => locked
1023 },
1024 avatar: avatar,
1025 name: data["name"],
1026 follower_address: data["followers"],
1027 following_address: data["following"],
1028 bio: data["summary"]
1029 }
1030
1031 # nickname can be nil because of virtual actors
1032 user_data =
1033 if data["preferredUsername"] do
1034 Map.put(
1035 user_data,
1036 :nickname,
1037 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1038 )
1039 else
1040 Map.put(user_data, :nickname, nil)
1041 end
1042
1043 {:ok, user_data}
1044 end
1045
1046 def user_data_from_user_object(data) do
1047 with {:ok, data} <- MRF.filter(data),
1048 {:ok, data} <- object_to_user_data(data) do
1049 {:ok, data}
1050 else
1051 e -> {:error, e}
1052 end
1053 end
1054
1055 def fetch_and_prepare_user_from_ap_id(ap_id) do
1056 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1057 {:ok, data} <- user_data_from_user_object(data) do
1058 {:ok, data}
1059 else
1060 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1061 end
1062 end
1063
1064 def make_user_from_ap_id(ap_id) do
1065 if _user = User.get_cached_by_ap_id(ap_id) do
1066 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1067 else
1068 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1069 User.insert_or_update_user(data)
1070 else
1071 e -> {:error, e}
1072 end
1073 end
1074 end
1075
1076 def make_user_from_nickname(nickname) do
1077 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1078 make_user_from_ap_id(ap_id)
1079 else
1080 _e -> {:error, "No AP id in WebFinger"}
1081 end
1082 end
1083
1084 # filter out broken threads
1085 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1086 entire_thread_visible_for_user?(activity, user)
1087 end
1088
1089 # do post-processing on a specific activity
1090 def contain_activity(%Activity{} = activity, %User{} = user) do
1091 contain_broken_threads(activity, user)
1092 end
1093
1094 def fetch_direct_messages_query do
1095 Activity
1096 |> restrict_type(%{"type" => "Create"})
1097 |> restrict_visibility(%{visibility: "direct"})
1098 |> order_by([activity], asc: activity.id)
1099 end
1100 end