Merge branch 'develop' into feature/addressable-lists
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = Map.get(data, "to", [])
30 cc = Map.get(data, "cc", [])
31 bcc = Map.get(data, "bcc", [])
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil -> true
38 user -> User.following?(user, actor)
39 end
40 end)
41
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(%{"type" => "Create"} = data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 actor = Map.get(data, "actor", [])
50 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
51 {recipients, to, cc}
52 end
53
54 defp get_recipients(data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 recipients = Enum.concat([to, cc, bcc])
59 {recipients, to, cc}
60 end
61
62 defp check_actor_is_active(actor) do
63 if not is_nil(actor) do
64 with user <- User.get_cached_by_ap_id(actor),
65 false <- user.info.deactivated do
66 :ok
67 else
68 _e -> :reject
69 end
70 else
71 :ok
72 end
73 end
74
75 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
76 limit = Config.get([:instance, :remote_limit])
77 String.length(content) <= limit
78 end
79
80 defp check_remote_limit(_), do: true
81
82 def increase_note_count_if_public(actor, object) do
83 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
84 end
85
86 def decrease_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
88 end
89
90 def increase_replies_count_if_reply(%{
91 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 "type" => "Create"
93 }) do
94 if is_public?(object) do
95 Object.increase_replies_count(reply_ap_id)
96 end
97 end
98
99 def increase_replies_count_if_reply(_create_data), do: :noop
100
101 def decrease_replies_count_if_reply(%Object{
102 data: %{"inReplyTo" => reply_ap_id} = object
103 }) do
104 if is_public?(object) do
105 Object.decrease_replies_count(reply_ap_id)
106 end
107 end
108
109 def decrease_replies_count_if_reply(_object), do: :noop
110
111 def increase_poll_votes_if_vote(%{
112 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
113 "type" => "Create"
114 }) do
115 Object.increase_vote_count(reply_ap_id, name)
116 end
117
118 def increase_poll_votes_if_vote(_create_data), do: :noop
119
120 def insert(map, local \\ true, fake \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 :ok <- check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:ok, map, object} <- insert_full_object(map) do
129 {:ok, activity} =
130 Repo.insert(%Activity{
131 data: map,
132 local: local,
133 actor: map["actor"],
134 recipients: recipients
135 })
136
137 # Splice in the child object if we have one.
138 activity =
139 if !is_nil(object) do
140 Map.put(activity, :object, object)
141 else
142 activity
143 end
144
145 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
146
147 Notification.create_notifications(activity)
148
149 participations =
150 activity
151 |> Conversation.create_or_bump_for()
152 |> get_participations()
153
154 stream_out(activity)
155 stream_out_participations(participations)
156 {:ok, activity}
157 else
158 %Activity{} = activity ->
159 {:ok, activity}
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 error ->
174 {:error, error}
175 end
176 end
177
178 defp get_participations({:ok, %{participations: participations}}), do: participations
179 defp get_participations(_), do: []
180
181 def stream_out_participations(participations) do
182 participations =
183 participations
184 |> Repo.preload(:user)
185
186 Enum.each(participations, fn participation ->
187 Pleroma.Web.Streamer.stream("participation", participation)
188 end)
189 end
190
191 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
192 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
193 conversation = Repo.preload(conversation, :participations),
194 last_activity_id =
195 fetch_latest_activity_id_for_context(conversation.ap_id, %{
196 "user" => user,
197 "blocking_user" => user
198 }) do
199 if last_activity_id do
200 stream_out_participations(conversation.participations)
201 end
202 end
203 end
204
205 def stream_out_participations(_, _), do: :noop
206
207 def stream_out(activity) do
208 public = "https://www.w3.org/ns/activitystreams#Public"
209
210 if activity.data["type"] in ["Create", "Announce", "Delete"] do
211 object = Object.normalize(activity)
212 # Do not stream out poll replies
213 unless object.data["type"] == "Answer" do
214 Pleroma.Web.Streamer.stream("user", activity)
215 Pleroma.Web.Streamer.stream("list", activity)
216
217 if Enum.member?(activity.data["to"], public) do
218 Pleroma.Web.Streamer.stream("public", activity)
219
220 if activity.local do
221 Pleroma.Web.Streamer.stream("public:local", activity)
222 end
223
224 if activity.data["type"] in ["Create"] do
225 object.data
226 |> Map.get("tag", [])
227 |> Enum.filter(fn tag -> is_bitstring(tag) end)
228 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
229
230 if object.data["attachment"] != [] do
231 Pleroma.Web.Streamer.stream("public:media", activity)
232
233 if activity.local do
234 Pleroma.Web.Streamer.stream("public:local:media", activity)
235 end
236 end
237 end
238 else
239 # TODO: Write test, replace with visibility test
240 if !Enum.member?(activity.data["cc"] || [], public) &&
241 !Enum.member?(
242 activity.data["to"],
243 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
244 ),
245 do: Pleroma.Web.Streamer.stream("direct", activity)
246 end
247 end
248 end
249 end
250
251 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
252 additional = params[:additional] || %{}
253 # only accept false as false value
254 local = !(params[:local] == false)
255 published = params[:published]
256
257 with create_data <-
258 make_create_data(
259 %{to: to, actor: actor, published: published, context: context, object: object},
260 additional
261 ),
262 {:ok, activity} <- insert(create_data, local, fake),
263 {:fake, false, activity} <- {:fake, fake, activity},
264 _ <- increase_replies_count_if_reply(create_data),
265 _ <- increase_poll_votes_if_vote(create_data),
266 # Changing note count prior to enqueuing federation task in order to avoid
267 # race conditions on updating user.info
268 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity}
271 else
272 {:fake, true, activity} ->
273 {:ok, activity}
274 end
275 end
276
277 def accept(%{to: to, actor: actor, object: object} = params) do
278 # only accept false as false value
279 local = !(params[:local] == false)
280
281 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
282 {:ok, activity} <- insert(data, local),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 end
286 end
287
288 def reject(%{to: to, actor: actor, object: object} = params) do
289 # only accept false as false value
290 local = !(params[:local] == false)
291
292 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
293 {:ok, activity} <- insert(data, local),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
300 # only accept false as false value
301 local = !(params[:local] == false)
302
303 with data <- %{
304 "to" => to,
305 "cc" => cc,
306 "type" => "Update",
307 "actor" => actor,
308 "object" => object
309 },
310 {:ok, activity} <- insert(data, local),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
317 def like(
318 %User{ap_id: ap_id} = user,
319 %Object{data: %{"id" => _}} = object,
320 activity_id \\ nil,
321 local \\ true
322 ) do
323 with nil <- get_existing_like(ap_id, object),
324 like_data <- make_like_data(user, object, activity_id),
325 {:ok, activity} <- insert(like_data, local),
326 {:ok, object} <- add_like_to_object(activity, object),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity, object}
329 else
330 %Activity{} = activity -> {:ok, activity, object}
331 error -> {:error, error}
332 end
333 end
334
335 def unlike(
336 %User{} = actor,
337 %Object{} = object,
338 activity_id \\ nil,
339 local \\ true
340 ) do
341 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
342 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
343 {:ok, unlike_activity} <- insert(unlike_data, local),
344 {:ok, _activity} <- Repo.delete(like_activity),
345 {:ok, object} <- remove_like_from_object(like_activity, object),
346 :ok <- maybe_federate(unlike_activity) do
347 {:ok, unlike_activity, like_activity, object}
348 else
349 _e -> {:ok, object}
350 end
351 end
352
353 def announce(
354 %User{ap_id: _} = user,
355 %Object{data: %{"id" => _}} = object,
356 activity_id \\ nil,
357 local \\ true,
358 public \\ true
359 ) do
360 with true <- is_public?(object),
361 announce_data <- make_announce_data(user, object, activity_id, public),
362 {:ok, activity} <- insert(announce_data, local),
363 {:ok, object} <- add_announce_to_object(activity, object),
364 :ok <- maybe_federate(activity) do
365 {:ok, activity, object}
366 else
367 error -> {:error, error}
368 end
369 end
370
371 def unannounce(
372 %User{} = actor,
373 %Object{} = object,
374 activity_id \\ nil,
375 local \\ true
376 ) do
377 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
378 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
379 {:ok, unannounce_activity} <- insert(unannounce_data, local),
380 :ok <- maybe_federate(unannounce_activity),
381 {:ok, _activity} <- Repo.delete(announce_activity),
382 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
383 {:ok, unannounce_activity, object}
384 else
385 _e -> {:ok, object}
386 end
387 end
388
389 def follow(follower, followed, activity_id \\ nil, local \\ true) do
390 with data <- make_follow_data(follower, followed, activity_id),
391 {:ok, activity} <- insert(data, local),
392 :ok <- maybe_federate(activity) do
393 {:ok, activity}
394 end
395 end
396
397 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
398 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
399 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
400 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
401 {:ok, activity} <- insert(unfollow_data, local),
402 :ok <- maybe_federate(activity) do
403 {:ok, activity}
404 end
405 end
406
407 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
408 with data <- %{
409 "to" => [follower_address],
410 "type" => "Delete",
411 "actor" => ap_id,
412 "object" => %{"type" => "Person", "id" => ap_id}
413 },
414 {:ok, activity} <- insert(data, true, true),
415 :ok <- maybe_federate(activity) do
416 {:ok, user}
417 end
418 end
419
420 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
421 user = User.get_cached_by_ap_id(actor)
422 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
423
424 with {:ok, object, activity} <- Object.delete(object),
425 data <- %{
426 "type" => "Delete",
427 "actor" => actor,
428 "object" => id,
429 "to" => to,
430 "deleted_activity_id" => activity && activity.id
431 },
432 {:ok, activity} <- insert(data, local, false),
433 stream_out_participations(object, user),
434 _ <- decrease_replies_count_if_reply(object),
435 # Changing note count prior to enqueuing federation task in order to avoid
436 # race conditions on updating user.info
437 {:ok, _actor} <- decrease_note_count_if_public(user, object),
438 :ok <- maybe_federate(activity) do
439 {:ok, activity}
440 end
441 end
442
443 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
444 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
445 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
446
447 if unfollow_blocked do
448 follow_activity = fetch_latest_follow(blocker, blocked)
449 if follow_activity, do: unfollow(blocker, blocked, nil, local)
450 end
451
452 with true <- outgoing_blocks,
453 block_data <- make_block_data(blocker, blocked, activity_id),
454 {:ok, activity} <- insert(block_data, local),
455 :ok <- maybe_federate(activity) do
456 {:ok, activity}
457 else
458 _e -> {:ok, nil}
459 end
460 end
461
462 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
463 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
464 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
465 {:ok, activity} <- insert(unblock_data, local),
466 :ok <- maybe_federate(activity) do
467 {:ok, activity}
468 end
469 end
470
471 def flag(
472 %{
473 actor: actor,
474 context: context,
475 account: account,
476 statuses: statuses,
477 content: content
478 } = params
479 ) do
480 # only accept false as false value
481 local = !(params[:local] == false)
482 forward = !(params[:forward] == false)
483
484 additional = params[:additional] || %{}
485
486 params = %{
487 actor: actor,
488 context: context,
489 account: account,
490 statuses: statuses,
491 content: content
492 }
493
494 additional =
495 if forward do
496 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
497 else
498 Map.merge(additional, %{"to" => [], "cc" => []})
499 end
500
501 with flag_data <- make_flag_data(params, additional),
502 {:ok, activity} <- insert(flag_data, local),
503 :ok <- maybe_federate(activity) do
504 Enum.each(User.all_superusers(), fn superuser ->
505 superuser
506 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
507 |> Pleroma.Emails.Mailer.deliver_async()
508 end)
509
510 {:ok, activity}
511 end
512 end
513
514 defp fetch_activities_for_context_query(context, opts) do
515 public = ["https://www.w3.org/ns/activitystreams#Public"]
516
517 recipients =
518 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
519
520 from(activity in Activity)
521 |> maybe_preload_objects(opts)
522 |> restrict_blocked(opts)
523 |> restrict_recipients(recipients, opts["user"])
524 |> where(
525 [activity],
526 fragment(
527 "?->>'type' = ? and ?->>'context' = ?",
528 activity.data,
529 "Create",
530 activity.data,
531 ^context
532 )
533 )
534 |> exclude_poll_votes(opts)
535 |> order_by([activity], desc: activity.id)
536 end
537
538 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
539 def fetch_activities_for_context(context, opts \\ %{}) do
540 context
541 |> fetch_activities_for_context_query(opts)
542 |> Repo.all()
543 end
544
545 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
546 Pleroma.FlakeId.t() | nil
547 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
548 context
549 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
550 |> limit(1)
551 |> select([a], a.id)
552 |> Repo.one()
553 end
554
555 def fetch_public_activities(opts \\ %{}) do
556 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
557
558 q
559 |> restrict_unlisted()
560 |> Pagination.fetch_paginated(opts)
561 |> Enum.reverse()
562 end
563
564 @valid_visibilities ~w[direct unlisted public private]
565
566 defp restrict_visibility(query, %{visibility: visibility})
567 when is_list(visibility) do
568 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
569 query =
570 from(
571 a in query,
572 where:
573 fragment(
574 "activity_visibility(?, ?, ?) = ANY (?)",
575 a.actor,
576 a.recipients,
577 a.data,
578 ^visibility
579 )
580 )
581
582 query
583 else
584 Logger.error("Could not restrict visibility to #{visibility}")
585 end
586 end
587
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
590 from(
591 a in query,
592 where:
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
594 )
595 end
596
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
600 end
601
602 defp restrict_visibility(query, _visibility), do: query
603
604 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
605 do: query
606
607 defp restrict_thread_visibility(
608 query,
609 %{"user" => %User{info: %{skip_thread_containment: true}}},
610 _
611 ),
612 do: query
613
614 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
615 from(
616 a in query,
617 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
618 )
619 end
620
621 defp restrict_thread_visibility(query, _, _), do: query
622
623 def fetch_user_activities(user, reading_user, params \\ %{}) do
624 params =
625 params
626 |> Map.put("type", ["Create", "Announce"])
627 |> Map.put("actor_id", user.ap_id)
628 |> Map.put("whole_db", true)
629 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
630
631 recipients =
632 if reading_user do
633 ["https://www.w3.org/ns/activitystreams#Public"] ++
634 [reading_user.ap_id | reading_user.following]
635 else
636 ["https://www.w3.org/ns/activitystreams#Public"]
637 end
638
639 fetch_activities(recipients, params)
640 |> Enum.reverse()
641 end
642
643 defp restrict_since(query, %{"since_id" => ""}), do: query
644
645 defp restrict_since(query, %{"since_id" => since_id}) do
646 from(activity in query, where: activity.id > ^since_id)
647 end
648
649 defp restrict_since(query, _), do: query
650
651 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
652 raise "Can't use the child object without preloading!"
653 end
654
655 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
656 when is_list(tag_reject) and tag_reject != [] do
657 from(
658 [_activity, object] in query,
659 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
660 )
661 end
662
663 defp restrict_tag_reject(query, _), do: query
664
665 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_all(query, %{"tag_all" => tag_all})
670 when is_list(tag_all) and tag_all != [] do
671 from(
672 [_activity, object] in query,
673 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
674 )
675 end
676
677 defp restrict_tag_all(query, _), do: query
678
679 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
680 raise "Can't use the child object without preloading!"
681 end
682
683 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
684 from(
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
687 )
688 end
689
690 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
691 from(
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
694 )
695 end
696
697 defp restrict_tag(query, _), do: query
698
699 defp restrict_recipients(query, [], _user), do: query
700
701 defp restrict_recipients(query, recipients, nil) do
702 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
703 end
704
705 defp restrict_recipients(query, recipients, user) do
706 from(
707 activity in query,
708 where: fragment("? && ?", ^recipients, activity.recipients),
709 or_where: activity.actor == ^user.ap_id
710 )
711 end
712
713 defp restrict_local(query, %{"local_only" => true}) do
714 from(activity in query, where: activity.local == true)
715 end
716
717 defp restrict_local(query, _), do: query
718
719 defp restrict_actor(query, %{"actor_id" => actor_id}) do
720 from(activity in query, where: activity.actor == ^actor_id)
721 end
722
723 defp restrict_actor(query, _), do: query
724
725 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
726 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
727 end
728
729 defp restrict_type(query, %{"type" => type}) do
730 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
731 end
732
733 defp restrict_type(query, _), do: query
734
735 defp restrict_state(query, %{"state" => state}) do
736 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
737 end
738
739 defp restrict_state(query, _), do: query
740
741 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
742 from(
743 activity in query,
744 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
745 )
746 end
747
748 defp restrict_favorited_by(query, _), do: query
749
750 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
751 raise "Can't use the child object without preloading!"
752 end
753
754 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
755 from(
756 [_activity, object] in query,
757 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
758 )
759 end
760
761 defp restrict_media(query, _), do: query
762
763 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
764 from(
765 activity in query,
766 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
767 )
768 end
769
770 defp restrict_replies(query, _), do: query
771
772 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
773 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
774 end
775
776 defp restrict_reblogs(query, _), do: query
777
778 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
779
780 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
781 mutes = info.mutes
782
783 from(
784 activity in query,
785 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
786 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
787 )
788 end
789
790 defp restrict_muted(query, _), do: query
791
792 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
793 blocks = info.blocks || []
794 domain_blocks = info.domain_blocks || []
795
796 query =
797 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
798
799 from(
800 [activity, object: o] in query,
801 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
802 where: fragment("not (? && ?)", activity.recipients, ^blocks),
803 where:
804 fragment(
805 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
806 activity.data,
807 activity.data,
808 ^blocks
809 ),
810 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
811 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
812 )
813 end
814
815 defp restrict_blocked(query, _), do: query
816
817 defp restrict_unlisted(query) do
818 from(
819 activity in query,
820 where:
821 fragment(
822 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
823 activity.data,
824 ^["https://www.w3.org/ns/activitystreams#Public"]
825 )
826 )
827 end
828
829 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
830 from(activity in query, where: activity.id in ^ids)
831 end
832
833 defp restrict_pinned(query, _), do: query
834
835 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
836 muted_reblogs = info.muted_reblogs || []
837
838 from(
839 activity in query,
840 where:
841 fragment(
842 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
843 activity.data,
844 activity.actor,
845 ^muted_reblogs
846 )
847 )
848 end
849
850 defp restrict_muted_reblogs(query, _), do: query
851
852 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
853
854 defp exclude_poll_votes(query, _) do
855 if has_named_binding?(query, :object) do
856 from([activity, object: o] in query,
857 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
858 )
859 else
860 query
861 end
862 end
863
864 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
865
866 defp maybe_preload_objects(query, _) do
867 query
868 |> Activity.with_preloaded_object()
869 end
870
871 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
872
873 defp maybe_preload_bookmarks(query, opts) do
874 query
875 |> Activity.with_preloaded_bookmark(opts["user"])
876 end
877
878 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
879
880 defp maybe_set_thread_muted_field(query, opts) do
881 query
882 |> Activity.with_set_thread_muted_field(opts["user"])
883 end
884
885 defp maybe_order(query, %{order: :desc}) do
886 query
887 |> order_by(desc: :id)
888 end
889
890 defp maybe_order(query, %{order: :asc}) do
891 query
892 |> order_by(asc: :id)
893 end
894
895 defp maybe_order(query, _), do: query
896
897 def fetch_activities_query(recipients, opts \\ %{}) do
898 config = %{
899 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
900 }
901
902 Activity
903 |> maybe_preload_objects(opts)
904 |> maybe_preload_bookmarks(opts)
905 |> maybe_set_thread_muted_field(opts)
906 |> maybe_order(opts)
907 |> restrict_recipients(recipients, opts["user"])
908 |> restrict_tag(opts)
909 |> restrict_tag_reject(opts)
910 |> restrict_tag_all(opts)
911 |> restrict_since(opts)
912 |> restrict_local(opts)
913 |> restrict_actor(opts)
914 |> restrict_type(opts)
915 |> restrict_state(opts)
916 |> restrict_favorited_by(opts)
917 |> restrict_blocked(opts)
918 |> restrict_muted(opts)
919 |> restrict_media(opts)
920 |> restrict_visibility(opts)
921 |> restrict_thread_visibility(opts, config)
922 |> restrict_replies(opts)
923 |> restrict_reblogs(opts)
924 |> restrict_pinned(opts)
925 |> restrict_muted_reblogs(opts)
926 |> Activity.restrict_deactivated_users()
927 |> exclude_poll_votes(opts)
928 end
929
930 def fetch_activities(recipients, opts \\ %{}) do
931 list_memberships = Pleroma.List.memberships(opts["user"])
932
933 fetch_activities_query(recipients ++ list_memberships, opts)
934 |> Pagination.fetch_paginated(opts)
935 |> Enum.reverse()
936 |> maybe_update_cc(list_memberships, opts["user"])
937 end
938
939 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
940 when is_list(list_memberships) and length(list_memberships) > 0 do
941 Enum.map(activities, fn
942 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
943 if Enum.any?(bcc, &(&1 in list_memberships)) do
944 update_in(activity.data["cc"], &[user_ap_id | &1])
945 else
946 activity
947 end
948
949 activity ->
950 activity
951 end)
952 end
953
954 defp maybe_update_cc(activities, _, _), do: activities
955
956 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
957 from(activity in query,
958 where:
959 fragment("? && ?", activity.recipients, ^recipients) or
960 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
961 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
962 )
963 end
964
965 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
966 fetch_activities_query([], opts)
967 |> fetch_activities_bounded_query(recipients, recipients_with_public)
968 |> Pagination.fetch_paginated(opts)
969 |> Enum.reverse()
970 end
971
972 def upload(file, opts \\ []) do
973 with {:ok, data} <- Upload.store(file, opts) do
974 obj_data =
975 if opts[:actor] do
976 Map.put(data, "actor", opts[:actor])
977 else
978 data
979 end
980
981 Repo.insert(%Object{data: obj_data})
982 end
983 end
984
985 defp object_to_user_data(data) do
986 avatar =
987 data["icon"]["url"] &&
988 %{
989 "type" => "Image",
990 "url" => [%{"href" => data["icon"]["url"]}]
991 }
992
993 banner =
994 data["image"]["url"] &&
995 %{
996 "type" => "Image",
997 "url" => [%{"href" => data["image"]["url"]}]
998 }
999
1000 locked = data["manuallyApprovesFollowers"] || false
1001 data = Transmogrifier.maybe_fix_user_object(data)
1002
1003 user_data = %{
1004 ap_id: data["id"],
1005 info: %{
1006 "ap_enabled" => true,
1007 "source_data" => data,
1008 "banner" => banner,
1009 "locked" => locked
1010 },
1011 avatar: avatar,
1012 name: data["name"],
1013 follower_address: data["followers"],
1014 bio: data["summary"]
1015 }
1016
1017 # nickname can be nil because of virtual actors
1018 user_data =
1019 if data["preferredUsername"] do
1020 Map.put(
1021 user_data,
1022 :nickname,
1023 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1024 )
1025 else
1026 Map.put(user_data, :nickname, nil)
1027 end
1028
1029 {:ok, user_data}
1030 end
1031
1032 def user_data_from_user_object(data) do
1033 with {:ok, data} <- MRF.filter(data),
1034 {:ok, data} <- object_to_user_data(data) do
1035 {:ok, data}
1036 else
1037 e -> {:error, e}
1038 end
1039 end
1040
1041 def fetch_and_prepare_user_from_ap_id(ap_id) do
1042 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1043 {:ok, data} <- user_data_from_user_object(data) do
1044 {:ok, data}
1045 else
1046 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1047 end
1048 end
1049
1050 def make_user_from_ap_id(ap_id) do
1051 if _user = User.get_cached_by_ap_id(ap_id) do
1052 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1053 else
1054 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1055 User.insert_or_update_user(data)
1056 else
1057 e -> {:error, e}
1058 end
1059 end
1060 end
1061
1062 def make_user_from_nickname(nickname) do
1063 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1064 make_user_from_ap_id(ap_id)
1065 else
1066 _e -> {:error, "No AP id in WebFinger"}
1067 end
1068 end
1069
1070 # filter out broken threads
1071 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1072 entire_thread_visible_for_user?(activity, user)
1073 end
1074
1075 # do post-processing on a specific activity
1076 def contain_activity(%Activity{} = activity, %User{} = user) do
1077 contain_broken_threads(activity, user)
1078 end
1079
1080 def fetch_direct_messages_query do
1081 Activity
1082 |> restrict_type(%{"type" => "Create"})
1083 |> restrict_visibility(%{visibility: "direct"})
1084 |> order_by([activity], asc: activity.id)
1085 end
1086 end