Merge branch 'pleroma-conversations' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Containment
12 alias Pleroma.Object.Fetcher
13 alias Pleroma.Pagination
14 alias Pleroma.Repo
15 alias Pleroma.Upload
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.MRF
18 alias Pleroma.Web.ActivityPub.Transmogrifier
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26 require Pleroma.Constants
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = Map.get(data, "to", [])
32 cc = Map.get(data, "cc", [])
33 bcc = Map.get(data, "bcc", [])
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil -> true
40 user -> User.following?(user, actor)
41 end
42 end)
43
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(%{"type" => "Create"} = data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 actor = Map.get(data, "actor", [])
52 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = Map.get(data, "to", [])
58 cc = Map.get(data, "cc", [])
59 bcc = Map.get(data, "bcc", [])
60 recipients = Enum.concat([to, cc, bcc])
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" => %{"inReplyTo" => reply_ap_id} = object,
94 "type" => "Create"
95 }) do
96 if is_public?(object) do
97 Object.increase_replies_count(reply_ap_id)
98 end
99 end
100
101 def increase_replies_count_if_reply(_create_data), do: :noop
102
103 def decrease_replies_count_if_reply(%Object{
104 data: %{"inReplyTo" => reply_ap_id} = object
105 }) do
106 if is_public?(object) do
107 Object.decrease_replies_count(reply_ap_id)
108 end
109 end
110
111 def decrease_replies_count_if_reply(_object), do: :noop
112
113 def increase_poll_votes_if_vote(%{
114 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
115 "type" => "Create"
116 }) do
117 Object.increase_vote_count(reply_ap_id, name)
118 end
119
120 def increase_poll_votes_if_vote(_create_data), do: :noop
121
122 def insert(map, local \\ true, fake \\ false) when is_map(map) do
123 with nil <- Activity.normalize(map),
124 map <- lazy_put_activity_defaults(map, fake),
125 :ok <- check_actor_is_active(map["actor"]),
126 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
127 {:ok, map} <- MRF.filter(map),
128 {recipients, _, _} = get_recipients(map),
129 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
130 :ok <- Containment.contain_child(map),
131 {:ok, map, object} <- insert_full_object(map) do
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: map,
135 local: local,
136 actor: map["actor"],
137 recipients: recipients
138 })
139
140 # Splice in the child object if we have one.
141 activity =
142 if !is_nil(object) do
143 Map.put(activity, :object, object)
144 else
145 activity
146 end
147
148 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
149
150 Notification.create_notifications(activity)
151
152 participations =
153 activity
154 |> Conversation.create_or_bump_for()
155 |> get_participations()
156
157 stream_out(activity)
158 stream_out_participations(participations)
159 {:ok, activity}
160 else
161 %Activity{} = activity ->
162 {:ok, activity}
163
164 {:fake, true, map, recipients} ->
165 activity = %Activity{
166 data: map,
167 local: local,
168 actor: map["actor"],
169 recipients: recipients,
170 id: "pleroma:fakeid"
171 }
172
173 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
174 {:ok, activity}
175
176 error ->
177 {:error, error}
178 end
179 end
180
181 defp get_participations({:ok, %{participations: participations}}), do: participations
182 defp get_participations(_), do: []
183
184 def stream_out_participations(participations) do
185 participations =
186 participations
187 |> Repo.preload(:user)
188
189 Enum.each(participations, fn participation ->
190 Pleroma.Web.Streamer.stream("participation", participation)
191 end)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(activity) do
211 if activity.data["type"] in ["Create", "Announce", "Delete"] do
212 object = Object.normalize(activity)
213 # Do not stream out poll replies
214 unless object.data["type"] == "Answer" do
215 Pleroma.Web.Streamer.stream("user", activity)
216 Pleroma.Web.Streamer.stream("list", activity)
217
218 if get_visibility(activity) == "public" do
219 Pleroma.Web.Streamer.stream("public", activity)
220
221 if activity.local do
222 Pleroma.Web.Streamer.stream("public:local", activity)
223 end
224
225 if activity.data["type"] in ["Create"] do
226 object.data
227 |> Map.get("tag", [])
228 |> Enum.filter(fn tag -> is_bitstring(tag) end)
229 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
230
231 if object.data["attachment"] != [] do
232 Pleroma.Web.Streamer.stream("public:media", activity)
233
234 if activity.local do
235 Pleroma.Web.Streamer.stream("public:local:media", activity)
236 end
237 end
238 end
239 else
240 if get_visibility(activity) == "direct",
241 do: Pleroma.Web.Streamer.stream("direct", activity)
242 end
243 end
244 end
245 end
246
247 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
248 additional = params[:additional] || %{}
249 # only accept false as false value
250 local = !(params[:local] == false)
251 published = params[:published]
252
253 with create_data <-
254 make_create_data(
255 %{to: to, actor: actor, published: published, context: context, object: object},
256 additional
257 ),
258 {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 # Changing note count prior to enqueuing federation task in order to avoid
263 # race conditions on updating user.info
264 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:fake, true, activity} ->
269 {:ok, activity}
270
271 {:error, message} ->
272 {:error, message}
273 end
274 end
275
276 def accept(%{to: to, actor: actor, object: object} = params) do
277 # only accept false as false value
278 local = !(params[:local] == false)
279
280 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
281 {:ok, activity} <- insert(data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 end
285 end
286
287 def reject(%{to: to, actor: actor, object: object} = params) do
288 # only accept false as false value
289 local = !(params[:local] == false)
290
291 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
292 {:ok, activity} <- insert(data, local),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
299 # only accept false as false value
300 local = !(params[:local] == false)
301
302 with data <- %{
303 "to" => to,
304 "cc" => cc,
305 "type" => "Update",
306 "actor" => actor,
307 "object" => object
308 },
309 {:ok, activity} <- insert(data, local),
310 :ok <- maybe_federate(activity) do
311 {:ok, activity}
312 end
313 end
314
315 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
316 def like(
317 %User{ap_id: ap_id} = user,
318 %Object{data: %{"id" => _}} = object,
319 activity_id \\ nil,
320 local \\ true
321 ) do
322 with nil <- get_existing_like(ap_id, object),
323 like_data <- make_like_data(user, object, activity_id),
324 {:ok, activity} <- insert(like_data, local),
325 {:ok, object} <- add_like_to_object(activity, object),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity, object}
328 else
329 %Activity{} = activity -> {:ok, activity, object}
330 error -> {:error, error}
331 end
332 end
333
334 def unlike(
335 %User{} = actor,
336 %Object{} = object,
337 activity_id \\ nil,
338 local \\ true
339 ) do
340 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
341 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
342 {:ok, unlike_activity} <- insert(unlike_data, local),
343 {:ok, _activity} <- Repo.delete(like_activity),
344 {:ok, object} <- remove_like_from_object(like_activity, object),
345 :ok <- maybe_federate(unlike_activity) do
346 {:ok, unlike_activity, like_activity, object}
347 else
348 _e -> {:ok, object}
349 end
350 end
351
352 def announce(
353 %User{ap_id: _} = user,
354 %Object{data: %{"id" => _}} = object,
355 activity_id \\ nil,
356 local \\ true,
357 public \\ true
358 ) do
359 with true <- is_public?(object),
360 announce_data <- make_announce_data(user, object, activity_id, public),
361 {:ok, activity} <- insert(announce_data, local),
362 {:ok, object} <- add_announce_to_object(activity, object),
363 :ok <- maybe_federate(activity) do
364 {:ok, activity, object}
365 else
366 error -> {:error, error}
367 end
368 end
369
370 def unannounce(
371 %User{} = actor,
372 %Object{} = object,
373 activity_id \\ nil,
374 local \\ true
375 ) do
376 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
377 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
378 {:ok, unannounce_activity} <- insert(unannounce_data, local),
379 :ok <- maybe_federate(unannounce_activity),
380 {:ok, _activity} <- Repo.delete(announce_activity),
381 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
382 {:ok, unannounce_activity, object}
383 else
384 _e -> {:ok, object}
385 end
386 end
387
388 def follow(follower, followed, activity_id \\ nil, local \\ true) do
389 with data <- make_follow_data(follower, followed, activity_id),
390 {:ok, activity} <- insert(data, local),
391 :ok <- maybe_federate(activity),
392 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
393 {:ok, activity}
394 end
395 end
396
397 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
398 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
399 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
400 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
401 {:ok, activity} <- insert(unfollow_data, local),
402 :ok <- maybe_federate(activity) do
403 {:ok, activity}
404 end
405 end
406
407 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
408 with data <- %{
409 "to" => [follower_address],
410 "type" => "Delete",
411 "actor" => ap_id,
412 "object" => %{"type" => "Person", "id" => ap_id}
413 },
414 {:ok, activity} <- insert(data, true, true),
415 :ok <- maybe_federate(activity) do
416 {:ok, user}
417 end
418 end
419
420 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
421 user = User.get_cached_by_ap_id(actor)
422 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
423
424 with {:ok, object, activity} <- Object.delete(object),
425 data <- %{
426 "type" => "Delete",
427 "actor" => actor,
428 "object" => id,
429 "to" => to,
430 "deleted_activity_id" => activity && activity.id
431 },
432 {:ok, activity} <- insert(data, local, false),
433 stream_out_participations(object, user),
434 _ <- decrease_replies_count_if_reply(object),
435 # Changing note count prior to enqueuing federation task in order to avoid
436 # race conditions on updating user.info
437 {:ok, _actor} <- decrease_note_count_if_public(user, object),
438 :ok <- maybe_federate(activity) do
439 {:ok, activity}
440 end
441 end
442
443 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
444 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
445 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
446
447 if unfollow_blocked do
448 follow_activity = fetch_latest_follow(blocker, blocked)
449 if follow_activity, do: unfollow(blocker, blocked, nil, local)
450 end
451
452 with true <- outgoing_blocks,
453 block_data <- make_block_data(blocker, blocked, activity_id),
454 {:ok, activity} <- insert(block_data, local),
455 :ok <- maybe_federate(activity) do
456 {:ok, activity}
457 else
458 _e -> {:ok, nil}
459 end
460 end
461
462 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
463 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
464 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
465 {:ok, activity} <- insert(unblock_data, local),
466 :ok <- maybe_federate(activity) do
467 {:ok, activity}
468 end
469 end
470
471 def flag(
472 %{
473 actor: actor,
474 context: context,
475 account: account,
476 statuses: statuses,
477 content: content
478 } = params
479 ) do
480 # only accept false as false value
481 local = !(params[:local] == false)
482 forward = !(params[:forward] == false)
483
484 additional = params[:additional] || %{}
485
486 params = %{
487 actor: actor,
488 context: context,
489 account: account,
490 statuses: statuses,
491 content: content
492 }
493
494 additional =
495 if forward do
496 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
497 else
498 Map.merge(additional, %{"to" => [], "cc" => []})
499 end
500
501 with flag_data <- make_flag_data(params, additional),
502 {:ok, activity} <- insert(flag_data, local),
503 :ok <- maybe_federate(activity) do
504 Enum.each(User.all_superusers(), fn superuser ->
505 superuser
506 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
507 |> Pleroma.Emails.Mailer.deliver_async()
508 end)
509
510 {:ok, activity}
511 end
512 end
513
514 defp fetch_activities_for_context_query(context, opts) do
515 public = [Pleroma.Constants.as_public()]
516
517 recipients =
518 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
519
520 from(activity in Activity)
521 |> maybe_preload_objects(opts)
522 |> maybe_preload_bookmarks(opts)
523 |> maybe_set_thread_muted_field(opts)
524 |> restrict_blocked(opts)
525 |> restrict_recipients(recipients, opts["user"])
526 |> where(
527 [activity],
528 fragment(
529 "?->>'type' = ? and ?->>'context' = ?",
530 activity.data,
531 "Create",
532 activity.data,
533 ^context
534 )
535 )
536 |> exclude_poll_votes(opts)
537 |> exclude_id(opts)
538 |> order_by([activity], desc: activity.id)
539 end
540
541 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
542 def fetch_activities_for_context(context, opts \\ %{}) do
543 context
544 |> fetch_activities_for_context_query(opts)
545 |> Repo.all()
546 end
547
548 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
549 Pleroma.FlakeId.t() | nil
550 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
551 context
552 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
553 |> limit(1)
554 |> select([a], a.id)
555 |> Repo.one()
556 end
557
558 def fetch_public_activities(opts \\ %{}) do
559 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
560
561 q
562 |> restrict_unlisted()
563 |> Pagination.fetch_paginated(opts)
564 |> Enum.reverse()
565 end
566
567 @valid_visibilities ~w[direct unlisted public private]
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when is_list(visibility) do
571 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
572 query =
573 from(
574 a in query,
575 where:
576 fragment(
577 "activity_visibility(?, ?, ?) = ANY (?)",
578 a.actor,
579 a.recipients,
580 a.data,
581 ^visibility
582 )
583 )
584
585 query
586 else
587 Logger.error("Could not restrict visibility to #{visibility}")
588 end
589 end
590
591 defp restrict_visibility(query, %{visibility: visibility})
592 when visibility in @valid_visibilities do
593 from(
594 a in query,
595 where:
596 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
597 )
598 end
599
600 defp restrict_visibility(_query, %{visibility: visibility})
601 when visibility not in @valid_visibilities do
602 Logger.error("Could not restrict visibility to #{visibility}")
603 end
604
605 defp restrict_visibility(query, _visibility), do: query
606
607 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
608 do: query
609
610 defp restrict_thread_visibility(
611 query,
612 %{"user" => %User{info: %{skip_thread_containment: true}}},
613 _
614 ),
615 do: query
616
617 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
618 from(
619 a in query,
620 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
621 )
622 end
623
624 defp restrict_thread_visibility(query, _, _), do: query
625
626 def fetch_user_activities(user, reading_user, params \\ %{}) do
627 params =
628 params
629 |> Map.put("type", ["Create", "Announce"])
630 |> Map.put("user", reading_user)
631 |> Map.put("actor_id", user.ap_id)
632 |> Map.put("whole_db", true)
633 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
634
635 recipients =
636 user_activities_recipients(%{
637 "godmode" => params["godmode"],
638 "reading_user" => reading_user
639 })
640
641 fetch_activities(recipients, params)
642 |> Enum.reverse()
643 end
644
645 defp user_activities_recipients(%{"godmode" => true}) do
646 []
647 end
648
649 defp user_activities_recipients(%{"reading_user" => reading_user}) do
650 if reading_user do
651 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
652 else
653 [Pleroma.Constants.as_public()]
654 end
655 end
656
657 defp restrict_since(query, %{"since_id" => ""}), do: query
658
659 defp restrict_since(query, %{"since_id" => since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
670 when is_list(tag_reject) and tag_reject != [] do
671 from(
672 [_activity, object] in query,
673 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
674 )
675 end
676
677 defp restrict_tag_reject(query, _), do: query
678
679 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
680 raise "Can't use the child object without preloading!"
681 end
682
683 defp restrict_tag_all(query, %{"tag_all" => tag_all})
684 when is_list(tag_all) and tag_all != [] do
685 from(
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
688 )
689 end
690
691 defp restrict_tag_all(query, _), do: query
692
693 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
694 raise "Can't use the child object without preloading!"
695 end
696
697 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
698 from(
699 [_activity, object] in query,
700 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
701 )
702 end
703
704 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
705 from(
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
708 )
709 end
710
711 defp restrict_tag(query, _), do: query
712
713 defp restrict_recipients(query, [], _user), do: query
714
715 defp restrict_recipients(query, recipients, nil) do
716 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 end
718
719 defp restrict_recipients(query, recipients, user) do
720 from(
721 activity in query,
722 where: fragment("? && ?", ^recipients, activity.recipients),
723 or_where: activity.actor == ^user.ap_id
724 )
725 end
726
727 defp restrict_local(query, %{"local_only" => true}) do
728 from(activity in query, where: activity.local == true)
729 end
730
731 defp restrict_local(query, _), do: query
732
733 defp restrict_actor(query, %{"actor_id" => actor_id}) do
734 from(activity in query, where: activity.actor == ^actor_id)
735 end
736
737 defp restrict_actor(query, _), do: query
738
739 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
740 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 end
742
743 defp restrict_type(query, %{"type" => type}) do
744 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 end
746
747 defp restrict_type(query, _), do: query
748
749 defp restrict_state(query, %{"state" => state}) do
750 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 end
752
753 defp restrict_state(query, _), do: query
754
755 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
756 from(
757 [_activity, object] in query,
758 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
759 )
760 end
761
762 defp restrict_favorited_by(query, _), do: query
763
764 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
765 raise "Can't use the child object without preloading!"
766 end
767
768 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
769 from(
770 [_activity, object] in query,
771 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
772 )
773 end
774
775 defp restrict_media(query, _), do: query
776
777 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
778 from(
779 activity in query,
780 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
781 )
782 end
783
784 defp restrict_replies(query, _), do: query
785
786 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
787 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
788 end
789
790 defp restrict_reblogs(query, _), do: query
791
792 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
793
794 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
795 mutes = info.mutes
796
797 from(
798 activity in query,
799 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
800 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
801 )
802 end
803
804 defp restrict_muted(query, _), do: query
805
806 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
807 blocks = info.blocks || []
808 domain_blocks = info.domain_blocks || []
809
810 query =
811 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
812
813 from(
814 [activity, object: o] in query,
815 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
816 where: fragment("not (? && ?)", activity.recipients, ^blocks),
817 where:
818 fragment(
819 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
820 activity.data,
821 activity.data,
822 ^blocks
823 ),
824 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
825 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
826 )
827 end
828
829 defp restrict_blocked(query, _), do: query
830
831 defp restrict_unlisted(query) do
832 from(
833 activity in query,
834 where:
835 fragment(
836 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
837 activity.data,
838 ^[Pleroma.Constants.as_public()]
839 )
840 )
841 end
842
843 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
844 from(activity in query, where: activity.id in ^ids)
845 end
846
847 defp restrict_pinned(query, _), do: query
848
849 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
850 muted_reblogs = info.muted_reblogs || []
851
852 from(
853 activity in query,
854 where:
855 fragment(
856 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
857 activity.data,
858 activity.actor,
859 ^muted_reblogs
860 )
861 )
862 end
863
864 defp restrict_muted_reblogs(query, _), do: query
865
866 defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
867
868 defp exclude_poll_votes(query, _) do
869 if has_named_binding?(query, :object) do
870 from([activity, object: o] in query,
871 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
872 )
873 else
874 query
875 end
876 end
877
878 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
879 from(activity in query, where: activity.id != ^id)
880 end
881
882 defp exclude_id(query, _), do: query
883
884 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
885
886 defp maybe_preload_objects(query, _) do
887 query
888 |> Activity.with_preloaded_object()
889 end
890
891 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
892
893 defp maybe_preload_bookmarks(query, opts) do
894 query
895 |> Activity.with_preloaded_bookmark(opts["user"])
896 end
897
898 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
899
900 defp maybe_set_thread_muted_field(query, opts) do
901 query
902 |> Activity.with_set_thread_muted_field(opts["user"])
903 end
904
905 defp maybe_order(query, %{order: :desc}) do
906 query
907 |> order_by(desc: :id)
908 end
909
910 defp maybe_order(query, %{order: :asc}) do
911 query
912 |> order_by(asc: :id)
913 end
914
915 defp maybe_order(query, _), do: query
916
917 def fetch_activities_query(recipients, opts \\ %{}) do
918 config = %{
919 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
920 }
921
922 Activity
923 |> maybe_preload_objects(opts)
924 |> maybe_preload_bookmarks(opts)
925 |> maybe_set_thread_muted_field(opts)
926 |> maybe_order(opts)
927 |> restrict_recipients(recipients, opts["user"])
928 |> restrict_tag(opts)
929 |> restrict_tag_reject(opts)
930 |> restrict_tag_all(opts)
931 |> restrict_since(opts)
932 |> restrict_local(opts)
933 |> restrict_actor(opts)
934 |> restrict_type(opts)
935 |> restrict_state(opts)
936 |> restrict_favorited_by(opts)
937 |> restrict_blocked(opts)
938 |> restrict_muted(opts)
939 |> restrict_media(opts)
940 |> restrict_visibility(opts)
941 |> restrict_thread_visibility(opts, config)
942 |> restrict_replies(opts)
943 |> restrict_reblogs(opts)
944 |> restrict_pinned(opts)
945 |> restrict_muted_reblogs(opts)
946 |> Activity.restrict_deactivated_users()
947 |> exclude_poll_votes(opts)
948 end
949
950 def fetch_activities(recipients, opts \\ %{}) do
951 list_memberships = Pleroma.List.memberships(opts["user"])
952
953 fetch_activities_query(recipients ++ list_memberships, opts)
954 |> Pagination.fetch_paginated(opts)
955 |> Enum.reverse()
956 |> maybe_update_cc(list_memberships, opts["user"])
957 end
958
959 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
960 when is_list(list_memberships) and length(list_memberships) > 0 do
961 Enum.map(activities, fn
962 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
963 if Enum.any?(bcc, &(&1 in list_memberships)) do
964 update_in(activity.data["cc"], &[user_ap_id | &1])
965 else
966 activity
967 end
968
969 activity ->
970 activity
971 end)
972 end
973
974 defp maybe_update_cc(activities, _, _), do: activities
975
976 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
977 from(activity in query,
978 where:
979 fragment("? && ?", activity.recipients, ^recipients) or
980 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
981 ^Pleroma.Constants.as_public() in activity.recipients)
982 )
983 end
984
985 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
986 fetch_activities_query([], opts)
987 |> fetch_activities_bounded_query(recipients, recipients_with_public)
988 |> Pagination.fetch_paginated(opts)
989 |> Enum.reverse()
990 end
991
992 def upload(file, opts \\ []) do
993 with {:ok, data} <- Upload.store(file, opts) do
994 obj_data =
995 if opts[:actor] do
996 Map.put(data, "actor", opts[:actor])
997 else
998 data
999 end
1000
1001 Repo.insert(%Object{data: obj_data})
1002 end
1003 end
1004
1005 defp object_to_user_data(data) do
1006 avatar =
1007 data["icon"]["url"] &&
1008 %{
1009 "type" => "Image",
1010 "url" => [%{"href" => data["icon"]["url"]}]
1011 }
1012
1013 banner =
1014 data["image"]["url"] &&
1015 %{
1016 "type" => "Image",
1017 "url" => [%{"href" => data["image"]["url"]}]
1018 }
1019
1020 locked = data["manuallyApprovesFollowers"] || false
1021 data = Transmogrifier.maybe_fix_user_object(data)
1022
1023 user_data = %{
1024 ap_id: data["id"],
1025 info: %{
1026 ap_enabled: true,
1027 source_data: data,
1028 banner: banner,
1029 locked: locked
1030 },
1031 avatar: avatar,
1032 name: data["name"],
1033 follower_address: data["followers"],
1034 following_address: data["following"],
1035 bio: data["summary"]
1036 }
1037
1038 # nickname can be nil because of virtual actors
1039 user_data =
1040 if data["preferredUsername"] do
1041 Map.put(
1042 user_data,
1043 :nickname,
1044 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1045 )
1046 else
1047 Map.put(user_data, :nickname, nil)
1048 end
1049
1050 {:ok, user_data}
1051 end
1052
1053 def fetch_follow_information_for_user(user) do
1054 with {:ok, following_data} <-
1055 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1056 following_count when is_integer(following_count) <- following_data["totalItems"],
1057 {:ok, hide_follows} <- collection_private(following_data),
1058 {:ok, followers_data} <-
1059 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1060 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1061 {:ok, hide_followers} <- collection_private(followers_data) do
1062 {:ok,
1063 %{
1064 hide_follows: hide_follows,
1065 follower_count: followers_count,
1066 following_count: following_count,
1067 hide_followers: hide_followers
1068 }}
1069 else
1070 {:error, _} = e ->
1071 e
1072
1073 e ->
1074 {:error, e}
1075 end
1076 end
1077
1078 defp maybe_update_follow_information(data) do
1079 with {:enabled, true} <-
1080 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1081 {:ok, info} <- fetch_follow_information_for_user(data) do
1082 info = Map.merge(data.info, info)
1083 Map.put(data, :info, info)
1084 else
1085 {:enabled, false} ->
1086 data
1087
1088 e ->
1089 Logger.error(
1090 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1091 )
1092
1093 data
1094 end
1095 end
1096
1097 defp collection_private(data) do
1098 if is_map(data["first"]) and
1099 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1100 {:ok, false}
1101 else
1102 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1103 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1104 {:ok, false}
1105 else
1106 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1107 {:ok, true}
1108
1109 {:error, _} = e ->
1110 e
1111
1112 e ->
1113 {:error, e}
1114 end
1115 end
1116 end
1117
1118 def user_data_from_user_object(data) do
1119 with {:ok, data} <- MRF.filter(data),
1120 {:ok, data} <- object_to_user_data(data) do
1121 {:ok, data}
1122 else
1123 e -> {:error, e}
1124 end
1125 end
1126
1127 def fetch_and_prepare_user_from_ap_id(ap_id) do
1128 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1129 {:ok, data} <- user_data_from_user_object(data),
1130 data <- maybe_update_follow_information(data) do
1131 {:ok, data}
1132 else
1133 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1134 end
1135 end
1136
1137 def make_user_from_ap_id(ap_id) do
1138 if _user = User.get_cached_by_ap_id(ap_id) do
1139 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1140 else
1141 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1142 User.insert_or_update_user(data)
1143 else
1144 e -> {:error, e}
1145 end
1146 end
1147 end
1148
1149 def make_user_from_nickname(nickname) do
1150 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1151 make_user_from_ap_id(ap_id)
1152 else
1153 _e -> {:error, "No AP id in WebFinger"}
1154 end
1155 end
1156
1157 # filter out broken threads
1158 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1159 entire_thread_visible_for_user?(activity, user)
1160 end
1161
1162 # do post-processing on a specific activity
1163 def contain_activity(%Activity{} = activity, %User{} = user) do
1164 contain_broken_threads(activity, user)
1165 end
1166
1167 def fetch_direct_messages_query do
1168 Activity
1169 |> restrict_type(%{"type" => "Create"})
1170 |> restrict_visibility(%{visibility: "direct"})
1171 |> order_by([activity], asc: activity.id)
1172 end
1173 end