Merge branch 'bugfix/poll-id-as-string' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = Map.get(data, "to", [])
31 cc = Map.get(data, "cc", [])
32 bcc = Map.get(data, "bcc", [])
33 actor = User.get_cached_by_ap_id(data["actor"])
34
35 recipients =
36 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil -> true
39 user -> User.following?(user, actor)
40 end
41 end)
42
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(%{"type" => "Create"} = data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 actor = Map.get(data, "actor", [])
51 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
52 {recipients, to, cc}
53 end
54
55 defp get_recipients(data) do
56 to = Map.get(data, "to", [])
57 cc = Map.get(data, "cc", [])
58 bcc = Map.get(data, "bcc", [])
59 recipients = Enum.concat([to, cc, bcc])
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def increase_note_count_if_public(actor, object) do
84 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
85 end
86
87 def decrease_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
89 end
90
91 def increase_replies_count_if_reply(%{
92 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 "type" => "Create"
94 }) do
95 if is_public?(object) do
96 Object.increase_replies_count(reply_ap_id)
97 end
98 end
99
100 def increase_replies_count_if_reply(_create_data), do: :noop
101
102 def decrease_replies_count_if_reply(%Object{
103 data: %{"inReplyTo" => reply_ap_id} = object
104 }) do
105 if is_public?(object) do
106 Object.decrease_replies_count(reply_ap_id)
107 end
108 end
109
110 def decrease_replies_count_if_reply(_object), do: :noop
111
112 def increase_poll_votes_if_vote(%{
113 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
114 "type" => "Create"
115 }) do
116 Object.increase_vote_count(reply_ap_id, name)
117 end
118
119 def increase_poll_votes_if_vote(_create_data), do: :noop
120
121 def insert(map, local \\ true, fake \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 :ok <- check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 :ok <- Containment.contain_child(map),
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 Repo.insert(%Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 })
138
139 # Splice in the child object if we have one.
140 activity =
141 if !is_nil(object) do
142 Map.put(activity, :object, object)
143 else
144 activity
145 end
146
147 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
148
149 Notification.create_notifications(activity)
150
151 participations =
152 activity
153 |> Conversation.create_or_bump_for()
154 |> get_participations()
155
156 stream_out(activity)
157 stream_out_participations(participations)
158 {:ok, activity}
159 else
160 %Activity{} = activity ->
161 {:ok, activity}
162
163 {:fake, true, map, recipients} ->
164 activity = %Activity{
165 data: map,
166 local: local,
167 actor: map["actor"],
168 recipients: recipients,
169 id: "pleroma:fakeid"
170 }
171
172 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:ok, activity}
174
175 error ->
176 {:error, error}
177 end
178 end
179
180 defp get_participations({:ok, %{participations: participations}}), do: participations
181 defp get_participations(_), do: []
182
183 def stream_out_participations(participations) do
184 participations =
185 participations
186 |> Repo.preload(:user)
187
188 Enum.each(participations, fn participation ->
189 Pleroma.Web.Streamer.stream("participation", participation)
190 end)
191 end
192
193 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
194 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
195 conversation = Repo.preload(conversation, :participations),
196 last_activity_id =
197 fetch_latest_activity_id_for_context(conversation.ap_id, %{
198 "user" => user,
199 "blocking_user" => user
200 }) do
201 if last_activity_id do
202 stream_out_participations(conversation.participations)
203 end
204 end
205 end
206
207 def stream_out_participations(_, _), do: :noop
208
209 def stream_out(activity) do
210 public = "https://www.w3.org/ns/activitystreams#Public"
211
212 if activity.data["type"] in ["Create", "Announce", "Delete"] do
213 object = Object.normalize(activity)
214 # Do not stream out poll replies
215 unless object.data["type"] == "Answer" do
216 Pleroma.Web.Streamer.stream("user", activity)
217 Pleroma.Web.Streamer.stream("list", activity)
218
219 if Enum.member?(activity.data["to"], public) do
220 Pleroma.Web.Streamer.stream("public", activity)
221
222 if activity.local do
223 Pleroma.Web.Streamer.stream("public:local", activity)
224 end
225
226 if activity.data["type"] in ["Create"] do
227 object.data
228 |> Map.get("tag", [])
229 |> Enum.filter(fn tag -> is_bitstring(tag) end)
230 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
231
232 if object.data["attachment"] != [] do
233 Pleroma.Web.Streamer.stream("public:media", activity)
234
235 if activity.local do
236 Pleroma.Web.Streamer.stream("public:local:media", activity)
237 end
238 end
239 end
240 else
241 # TODO: Write test, replace with visibility test
242 if !Enum.member?(activity.data["cc"] || [], public) &&
243 !Enum.member?(
244 activity.data["to"],
245 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
246 ),
247 do: Pleroma.Web.Streamer.stream("direct", activity)
248 end
249 end
250 end
251 end
252
253 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
254 additional = params[:additional] || %{}
255 # only accept false as false value
256 local = !(params[:local] == false)
257 published = params[:published]
258
259 with create_data <-
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 ),
264 {:ok, activity} <- insert(create_data, local, fake),
265 {:fake, false, activity} <- {:fake, fake, activity},
266 _ <- increase_replies_count_if_reply(create_data),
267 _ <- increase_poll_votes_if_vote(create_data),
268 # Changing note count prior to enqueuing federation task in order to avoid
269 # race conditions on updating user.info
270 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
271 :ok <- maybe_federate(activity) do
272 {:ok, activity}
273 else
274 {:fake, true, activity} ->
275 {:ok, activity}
276 end
277 end
278
279 def accept(%{to: to, actor: actor, object: object} = params) do
280 # only accept false as false value
281 local = !(params[:local] == false)
282
283 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
284 {:ok, activity} <- insert(data, local),
285 :ok <- maybe_federate(activity) do
286 {:ok, activity}
287 end
288 end
289
290 def reject(%{to: to, actor: actor, object: object} = params) do
291 # only accept false as false value
292 local = !(params[:local] == false)
293
294 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
302 # only accept false as false value
303 local = !(params[:local] == false)
304
305 with data <- %{
306 "to" => to,
307 "cc" => cc,
308 "type" => "Update",
309 "actor" => actor,
310 "object" => object
311 },
312 {:ok, activity} <- insert(data, local),
313 :ok <- maybe_federate(activity) do
314 {:ok, activity}
315 end
316 end
317
318 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
319 def like(
320 %User{ap_id: ap_id} = user,
321 %Object{data: %{"id" => _}} = object,
322 activity_id \\ nil,
323 local \\ true
324 ) do
325 with nil <- get_existing_like(ap_id, object),
326 like_data <- make_like_data(user, object, activity_id),
327 {:ok, activity} <- insert(like_data, local),
328 {:ok, object} <- add_like_to_object(activity, object),
329 :ok <- maybe_federate(activity) do
330 {:ok, activity, object}
331 else
332 %Activity{} = activity -> {:ok, activity, object}
333 error -> {:error, error}
334 end
335 end
336
337 def unlike(
338 %User{} = actor,
339 %Object{} = object,
340 activity_id \\ nil,
341 local \\ true
342 ) do
343 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
344 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
345 {:ok, unlike_activity} <- insert(unlike_data, local),
346 {:ok, _activity} <- Repo.delete(like_activity),
347 {:ok, object} <- remove_like_from_object(like_activity, object),
348 :ok <- maybe_federate(unlike_activity) do
349 {:ok, unlike_activity, like_activity, object}
350 else
351 _e -> {:ok, object}
352 end
353 end
354
355 def announce(
356 %User{ap_id: _} = user,
357 %Object{data: %{"id" => _}} = object,
358 activity_id \\ nil,
359 local \\ true,
360 public \\ true
361 ) do
362 with true <- is_public?(object),
363 announce_data <- make_announce_data(user, object, activity_id, public),
364 {:ok, activity} <- insert(announce_data, local),
365 {:ok, object} <- add_announce_to_object(activity, object),
366 :ok <- maybe_federate(activity) do
367 {:ok, activity, object}
368 else
369 error -> {:error, error}
370 end
371 end
372
373 def unannounce(
374 %User{} = actor,
375 %Object{} = object,
376 activity_id \\ nil,
377 local \\ true
378 ) do
379 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
380 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
381 {:ok, unannounce_activity} <- insert(unannounce_data, local),
382 :ok <- maybe_federate(unannounce_activity),
383 {:ok, _activity} <- Repo.delete(announce_activity),
384 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
385 {:ok, unannounce_activity, object}
386 else
387 _e -> {:ok, object}
388 end
389 end
390
391 def follow(follower, followed, activity_id \\ nil, local \\ true) do
392 with data <- make_follow_data(follower, followed, activity_id),
393 {:ok, activity} <- insert(data, local),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity}
396 end
397 end
398
399 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
400 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
401 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
402 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
403 {:ok, activity} <- insert(unfollow_data, local),
404 :ok <- maybe_federate(activity) do
405 {:ok, activity}
406 end
407 end
408
409 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
410 with data <- %{
411 "to" => [follower_address],
412 "type" => "Delete",
413 "actor" => ap_id,
414 "object" => %{"type" => "Person", "id" => ap_id}
415 },
416 {:ok, activity} <- insert(data, true, true),
417 :ok <- maybe_federate(activity) do
418 {:ok, user}
419 end
420 end
421
422 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
423 user = User.get_cached_by_ap_id(actor)
424 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
425
426 with {:ok, object, activity} <- Object.delete(object),
427 data <- %{
428 "type" => "Delete",
429 "actor" => actor,
430 "object" => id,
431 "to" => to,
432 "deleted_activity_id" => activity && activity.id
433 },
434 {:ok, activity} <- insert(data, local, false),
435 stream_out_participations(object, user),
436 _ <- decrease_replies_count_if_reply(object),
437 # Changing note count prior to enqueuing federation task in order to avoid
438 # race conditions on updating user.info
439 {:ok, _actor} <- decrease_note_count_if_public(user, object),
440 :ok <- maybe_federate(activity) do
441 {:ok, activity}
442 end
443 end
444
445 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
446 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
447 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
448
449 if unfollow_blocked do
450 follow_activity = fetch_latest_follow(blocker, blocked)
451 if follow_activity, do: unfollow(blocker, blocked, nil, local)
452 end
453
454 with true <- outgoing_blocks,
455 block_data <- make_block_data(blocker, blocked, activity_id),
456 {:ok, activity} <- insert(block_data, local),
457 :ok <- maybe_federate(activity) do
458 {:ok, activity}
459 else
460 _e -> {:ok, nil}
461 end
462 end
463
464 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
465 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
466 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
467 {:ok, activity} <- insert(unblock_data, local),
468 :ok <- maybe_federate(activity) do
469 {:ok, activity}
470 end
471 end
472
473 def flag(
474 %{
475 actor: actor,
476 context: context,
477 account: account,
478 statuses: statuses,
479 content: content
480 } = params
481 ) do
482 # only accept false as false value
483 local = !(params[:local] == false)
484 forward = !(params[:forward] == false)
485
486 additional = params[:additional] || %{}
487
488 params = %{
489 actor: actor,
490 context: context,
491 account: account,
492 statuses: statuses,
493 content: content
494 }
495
496 additional =
497 if forward do
498 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
499 else
500 Map.merge(additional, %{"to" => [], "cc" => []})
501 end
502
503 with flag_data <- make_flag_data(params, additional),
504 {:ok, activity} <- insert(flag_data, local),
505 :ok <- maybe_federate(activity) do
506 Enum.each(User.all_superusers(), fn superuser ->
507 superuser
508 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
509 |> Pleroma.Emails.Mailer.deliver_async()
510 end)
511
512 {:ok, activity}
513 end
514 end
515
516 defp fetch_activities_for_context_query(context, opts) do
517 public = ["https://www.w3.org/ns/activitystreams#Public"]
518
519 recipients =
520 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
521
522 from(activity in Activity)
523 |> maybe_preload_objects(opts)
524 |> restrict_blocked(opts)
525 |> restrict_recipients(recipients, opts["user"])
526 |> where(
527 [activity],
528 fragment(
529 "?->>'type' = ? and ?->>'context' = ?",
530 activity.data,
531 "Create",
532 activity.data,
533 ^context
534 )
535 )
536 |> exclude_poll_votes(opts)
537 |> order_by([activity], desc: activity.id)
538 end
539
540 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
541 def fetch_activities_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(opts)
544 |> Repo.all()
545 end
546
547 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
548 Pleroma.FlakeId.t() | nil
549 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
550 context
551 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
552 |> limit(1)
553 |> select([a], a.id)
554 |> Repo.one()
555 end
556
557 def fetch_public_activities(opts \\ %{}) do
558 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
559
560 q
561 |> restrict_unlisted()
562 |> Pagination.fetch_paginated(opts)
563 |> Enum.reverse()
564 end
565
566 @valid_visibilities ~w[direct unlisted public private]
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
571 query =
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583
584 query
585 else
586 Logger.error("Could not restrict visibility to #{visibility}")
587 end
588 end
589
590 defp restrict_visibility(query, %{visibility: visibility})
591 when visibility in @valid_visibilities do
592 from(
593 a in query,
594 where:
595 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
596 )
597 end
598
599 defp restrict_visibility(_query, %{visibility: visibility})
600 when visibility not in @valid_visibilities do
601 Logger.error("Could not restrict visibility to #{visibility}")
602 end
603
604 defp restrict_visibility(query, _visibility), do: query
605
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 do: query
608
609 defp restrict_thread_visibility(
610 query,
611 %{"user" => %User{info: %{skip_thread_containment: true}}},
612 _
613 ),
614 do: query
615
616 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
617 from(
618 a in query,
619 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
620 )
621 end
622
623 defp restrict_thread_visibility(query, _, _), do: query
624
625 def fetch_user_activities(user, reading_user, params \\ %{}) do
626 params =
627 params
628 |> Map.put("type", ["Create", "Announce"])
629 |> Map.put("actor_id", user.ap_id)
630 |> Map.put("whole_db", true)
631 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
632
633 recipients =
634 if reading_user do
635 ["https://www.w3.org/ns/activitystreams#Public"] ++
636 [reading_user.ap_id | reading_user.following]
637 else
638 ["https://www.w3.org/ns/activitystreams#Public"]
639 end
640
641 fetch_activities(recipients, params)
642 |> Enum.reverse()
643 end
644
645 defp restrict_since(query, %{"since_id" => ""}), do: query
646
647 defp restrict_since(query, %{"since_id" => since_id}) do
648 from(activity in query, where: activity.id > ^since_id)
649 end
650
651 defp restrict_since(query, _), do: query
652
653 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
654 raise "Can't use the child object without preloading!"
655 end
656
657 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
658 when is_list(tag_reject) and tag_reject != [] do
659 from(
660 [_activity, object] in query,
661 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
662 )
663 end
664
665 defp restrict_tag_reject(query, _), do: query
666
667 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_tag_all(query, %{"tag_all" => tag_all})
672 when is_list(tag_all) and tag_all != [] do
673 from(
674 [_activity, object] in query,
675 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
676 )
677 end
678
679 defp restrict_tag_all(query, _), do: query
680
681 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
682 raise "Can't use the child object without preloading!"
683 end
684
685 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
686 from(
687 [_activity, object] in query,
688 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
689 )
690 end
691
692 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
693 from(
694 [_activity, object] in query,
695 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
696 )
697 end
698
699 defp restrict_tag(query, _), do: query
700
701 defp restrict_recipients(query, [], _user), do: query
702
703 defp restrict_recipients(query, recipients, nil) do
704 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
705 end
706
707 defp restrict_recipients(query, recipients, user) do
708 from(
709 activity in query,
710 where: fragment("? && ?", ^recipients, activity.recipients),
711 or_where: activity.actor == ^user.ap_id
712 )
713 end
714
715 defp restrict_local(query, %{"local_only" => true}) do
716 from(activity in query, where: activity.local == true)
717 end
718
719 defp restrict_local(query, _), do: query
720
721 defp restrict_actor(query, %{"actor_id" => actor_id}) do
722 from(activity in query, where: activity.actor == ^actor_id)
723 end
724
725 defp restrict_actor(query, _), do: query
726
727 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
728 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
729 end
730
731 defp restrict_type(query, %{"type" => type}) do
732 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
733 end
734
735 defp restrict_type(query, _), do: query
736
737 defp restrict_state(query, %{"state" => state}) do
738 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
739 end
740
741 defp restrict_state(query, _), do: query
742
743 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
744 from(
745 activity in query,
746 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
747 )
748 end
749
750 defp restrict_favorited_by(query, _), do: query
751
752 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
753 raise "Can't use the child object without preloading!"
754 end
755
756 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
757 from(
758 [_activity, object] in query,
759 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
760 )
761 end
762
763 defp restrict_media(query, _), do: query
764
765 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
766 from(
767 activity in query,
768 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
769 )
770 end
771
772 defp restrict_replies(query, _), do: query
773
774 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
775 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
776 end
777
778 defp restrict_reblogs(query, _), do: query
779
780 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
781
782 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
783 mutes = info.mutes
784
785 from(
786 activity in query,
787 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
788 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
789 )
790 end
791
792 defp restrict_muted(query, _), do: query
793
794 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
795 blocks = info.blocks || []
796 domain_blocks = info.domain_blocks || []
797
798 query =
799 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
800
801 from(
802 [activity, object: o] in query,
803 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
804 where: fragment("not (? && ?)", activity.recipients, ^blocks),
805 where:
806 fragment(
807 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
808 activity.data,
809 activity.data,
810 ^blocks
811 ),
812 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
813 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
814 )
815 end
816
817 defp restrict_blocked(query, _), do: query
818
819 defp restrict_unlisted(query) do
820 from(
821 activity in query,
822 where:
823 fragment(
824 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
825 activity.data,
826 ^["https://www.w3.org/ns/activitystreams#Public"]
827 )
828 )
829 end
830
831 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
832 from(activity in query, where: activity.id in ^ids)
833 end
834
835 defp restrict_pinned(query, _), do: query
836
837 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
838 muted_reblogs = info.muted_reblogs || []
839
840 from(
841 activity in query,
842 where:
843 fragment(
844 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
845 activity.data,
846 activity.actor,
847 ^muted_reblogs
848 )
849 )
850 end
851
852 defp restrict_muted_reblogs(query, _), do: query
853
854 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
855
856 defp exclude_poll_votes(query, _) do
857 if has_named_binding?(query, :object) do
858 from([activity, object: o] in query,
859 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
860 )
861 else
862 query
863 end
864 end
865
866 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
867
868 defp maybe_preload_objects(query, _) do
869 query
870 |> Activity.with_preloaded_object()
871 end
872
873 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
874
875 defp maybe_preload_bookmarks(query, opts) do
876 query
877 |> Activity.with_preloaded_bookmark(opts["user"])
878 end
879
880 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
881
882 defp maybe_set_thread_muted_field(query, opts) do
883 query
884 |> Activity.with_set_thread_muted_field(opts["user"])
885 end
886
887 defp maybe_order(query, %{order: :desc}) do
888 query
889 |> order_by(desc: :id)
890 end
891
892 defp maybe_order(query, %{order: :asc}) do
893 query
894 |> order_by(asc: :id)
895 end
896
897 defp maybe_order(query, _), do: query
898
899 def fetch_activities_query(recipients, opts \\ %{}) do
900 config = %{
901 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
902 }
903
904 Activity
905 |> maybe_preload_objects(opts)
906 |> maybe_preload_bookmarks(opts)
907 |> maybe_set_thread_muted_field(opts)
908 |> maybe_order(opts)
909 |> restrict_recipients(recipients, opts["user"])
910 |> restrict_tag(opts)
911 |> restrict_tag_reject(opts)
912 |> restrict_tag_all(opts)
913 |> restrict_since(opts)
914 |> restrict_local(opts)
915 |> restrict_actor(opts)
916 |> restrict_type(opts)
917 |> restrict_state(opts)
918 |> restrict_favorited_by(opts)
919 |> restrict_blocked(opts)
920 |> restrict_muted(opts)
921 |> restrict_media(opts)
922 |> restrict_visibility(opts)
923 |> restrict_thread_visibility(opts, config)
924 |> restrict_replies(opts)
925 |> restrict_reblogs(opts)
926 |> restrict_pinned(opts)
927 |> restrict_muted_reblogs(opts)
928 |> Activity.restrict_deactivated_users()
929 |> exclude_poll_votes(opts)
930 end
931
932 def fetch_activities(recipients, opts \\ %{}) do
933 list_memberships = Pleroma.List.memberships(opts["user"])
934
935 fetch_activities_query(recipients ++ list_memberships, opts)
936 |> Pagination.fetch_paginated(opts)
937 |> Enum.reverse()
938 |> maybe_update_cc(list_memberships, opts["user"])
939 end
940
941 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
942 when is_list(list_memberships) and length(list_memberships) > 0 do
943 Enum.map(activities, fn
944 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
945 if Enum.any?(bcc, &(&1 in list_memberships)) do
946 update_in(activity.data["cc"], &[user_ap_id | &1])
947 else
948 activity
949 end
950
951 activity ->
952 activity
953 end)
954 end
955
956 defp maybe_update_cc(activities, _, _), do: activities
957
958 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
959 from(activity in query,
960 where:
961 fragment("? && ?", activity.recipients, ^recipients) or
962 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
963 "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
964 )
965 end
966
967 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
968 fetch_activities_query([], opts)
969 |> fetch_activities_bounded_query(recipients, recipients_with_public)
970 |> Pagination.fetch_paginated(opts)
971 |> Enum.reverse()
972 end
973
974 def upload(file, opts \\ []) do
975 with {:ok, data} <- Upload.store(file, opts) do
976 obj_data =
977 if opts[:actor] do
978 Map.put(data, "actor", opts[:actor])
979 else
980 data
981 end
982
983 Repo.insert(%Object{data: obj_data})
984 end
985 end
986
987 defp object_to_user_data(data) do
988 avatar =
989 data["icon"]["url"] &&
990 %{
991 "type" => "Image",
992 "url" => [%{"href" => data["icon"]["url"]}]
993 }
994
995 banner =
996 data["image"]["url"] &&
997 %{
998 "type" => "Image",
999 "url" => [%{"href" => data["image"]["url"]}]
1000 }
1001
1002 locked = data["manuallyApprovesFollowers"] || false
1003 data = Transmogrifier.maybe_fix_user_object(data)
1004
1005 user_data = %{
1006 ap_id: data["id"],
1007 info: %{
1008 "ap_enabled" => true,
1009 "source_data" => data,
1010 "banner" => banner,
1011 "locked" => locked
1012 },
1013 avatar: avatar,
1014 name: data["name"],
1015 follower_address: data["followers"],
1016 following_address: data["following"],
1017 bio: data["summary"]
1018 }
1019
1020 # nickname can be nil because of virtual actors
1021 user_data =
1022 if data["preferredUsername"] do
1023 Map.put(
1024 user_data,
1025 :nickname,
1026 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1027 )
1028 else
1029 Map.put(user_data, :nickname, nil)
1030 end
1031
1032 {:ok, user_data}
1033 end
1034
1035 def user_data_from_user_object(data) do
1036 with {:ok, data} <- MRF.filter(data),
1037 {:ok, data} <- object_to_user_data(data) do
1038 {:ok, data}
1039 else
1040 e -> {:error, e}
1041 end
1042 end
1043
1044 def fetch_and_prepare_user_from_ap_id(ap_id) do
1045 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1046 {:ok, data} <- user_data_from_user_object(data) do
1047 {:ok, data}
1048 else
1049 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1050 end
1051 end
1052
1053 def make_user_from_ap_id(ap_id) do
1054 if _user = User.get_cached_by_ap_id(ap_id) do
1055 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1056 else
1057 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1058 User.insert_or_update_user(data)
1059 else
1060 e -> {:error, e}
1061 end
1062 end
1063 end
1064
1065 def make_user_from_nickname(nickname) do
1066 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1067 make_user_from_ap_id(ap_id)
1068 else
1069 _e -> {:error, "No AP id in WebFinger"}
1070 end
1071 end
1072
1073 # filter out broken threads
1074 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1075 entire_thread_visible_for_user?(activity, user)
1076 end
1077
1078 # do post-processing on a specific activity
1079 def contain_activity(%Activity{} = activity, %User{} = user) do
1080 contain_broken_threads(activity, user)
1081 end
1082
1083 def fetch_direct_messages_query do
1084 Activity
1085 |> restrict_type(%{"type" => "Create"})
1086 |> restrict_visibility(%{visibility: "direct"})
1087 |> order_by([activity], asc: activity.id)
1088 end
1089 end