05ed81d8fc90f053c589544ccc46fe8df0d1ea6a
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Object.Containment
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.Pagination
16 alias Pleroma.Repo
17 alias Pleroma.Upload
18 alias Pleroma.User
19 alias Pleroma.Web.ActivityPub.MRF
20 alias Pleroma.Web.ActivityPub.Transmogrifier
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Web.WebFinger
24 alias Pleroma.Workers.BackgroundWorker
25
26 import Ecto.Query
27 import Pleroma.Web.ActivityPub.Utils
28 import Pleroma.Web.ActivityPub.Visibility
29
30 require Logger
31 require Pleroma.Constants
32
33 # For Announce activities, we filter the recipients based on following status for any actors
34 # that match actual users. See issue #164 for more information about why this is necessary.
35 defp get_recipients(%{"type" => "Announce"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = User.get_cached_by_ap_id(data["actor"])
40
41 recipients =
42 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
43 case User.get_cached_by_ap_id(recipient) do
44 nil -> true
45 user -> User.following?(user, actor)
46 end
47 end)
48
49 {recipients, to, cc}
50 end
51
52 defp get_recipients(%{"type" => "Create"} = data) do
53 to = Map.get(data, "to", [])
54 cc = Map.get(data, "cc", [])
55 bcc = Map.get(data, "bcc", [])
56 actor = Map.get(data, "actor", [])
57 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
58 {recipients, to, cc}
59 end
60
61 defp get_recipients(data) do
62 to = Map.get(data, "to", [])
63 cc = Map.get(data, "cc", [])
64 bcc = Map.get(data, "bcc", [])
65 recipients = Enum.concat([to, cc, bcc])
66 {recipients, to, cc}
67 end
68
69 defp check_actor_is_active(actor) do
70 if not is_nil(actor) do
71 with user <- User.get_cached_by_ap_id(actor),
72 false <- user.info.deactivated do
73 true
74 else
75 _e -> false
76 end
77 else
78 true
79 end
80 end
81
82 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
83 limit = Config.get([:instance, :remote_limit])
84 String.length(content) <= limit
85 end
86
87 defp check_remote_limit(_), do: true
88
89 def increase_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 end
92
93 def decrease_note_count_if_public(actor, object) do
94 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 end
96
97 def increase_replies_count_if_reply(%{
98 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 "type" => "Create"
100 }) do
101 if is_public?(object) do
102 Object.increase_replies_count(reply_ap_id)
103 end
104 end
105
106 def increase_replies_count_if_reply(_create_data), do: :noop
107
108 def decrease_replies_count_if_reply(%Object{
109 data: %{"inReplyTo" => reply_ap_id} = object
110 }) do
111 if is_public?(object) do
112 Object.decrease_replies_count(reply_ap_id)
113 end
114 end
115
116 def decrease_replies_count_if_reply(_object), do: :noop
117
118 def increase_poll_votes_if_vote(%{
119 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
120 "type" => "Create"
121 }) do
122 Object.increase_vote_count(reply_ap_id, name)
123 end
124
125 def increase_poll_votes_if_vote(_create_data), do: :noop
126
127 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
128 with nil <- Activity.normalize(map),
129 map <- lazy_put_activity_defaults(map, fake),
130 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
131 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
132 {:ok, map} <- MRF.filter(map),
133 {recipients, _, _} = get_recipients(map),
134 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
135 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
136 {:ok, map, object} <- insert_full_object(map) do
137 {:ok, activity} =
138 Repo.insert(%Activity{
139 data: map,
140 local: local,
141 actor: map["actor"],
142 recipients: recipients
143 })
144
145 # Splice in the child object if we have one.
146 activity =
147 if not is_nil(object) do
148 Map.put(activity, :object, object)
149 else
150 activity
151 end
152
153 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
154
155 Notification.create_notifications(activity)
156
157 conversation = create_or_bump_conversation(activity, map["actor"])
158 participations = get_participations(conversation)
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp create_or_bump_conversation(activity, actor) do
184 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
185 %User{} = user <- User.get_cached_by_ap_id(actor),
186 Participation.mark_as_read(user, conversation) do
187 {:ok, conversation}
188 end
189 end
190
191 defp get_participations({:ok, conversation}) do
192 conversation
193 |> Repo.preload(:participations, force: true)
194 |> Map.get(:participations)
195 end
196
197 defp get_participations(_), do: []
198
199 def stream_out_participations(participations) do
200 participations =
201 participations
202 |> Repo.preload(:user)
203
204 Streamer.stream("participation", participations)
205 end
206
207 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
208 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
209 conversation = Repo.preload(conversation, :participations),
210 last_activity_id =
211 fetch_latest_activity_id_for_context(conversation.ap_id, %{
212 "user" => user,
213 "blocking_user" => user
214 }) do
215 if last_activity_id do
216 stream_out_participations(conversation.participations)
217 end
218 end
219 end
220
221 def stream_out_participations(_, _), do: :noop
222
223 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
224 when data_type in ["Create", "Announce", "Delete"] do
225 activity
226 |> Topics.get_activity_topics()
227 |> Streamer.stream(activity)
228 end
229
230 def stream_out(_activity) do
231 :noop
232 end
233
234 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
235 additional = params[:additional] || %{}
236 # only accept false as false value
237 local = !(params[:local] == false)
238 published = params[:published]
239 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
240
241 with create_data <-
242 make_create_data(
243 %{to: to, actor: actor, published: published, context: context, object: object},
244 additional
245 ),
246 {:ok, activity} <- insert(create_data, local, fake),
247 {:fake, false, activity} <- {:fake, fake, activity},
248 _ <- increase_replies_count_if_reply(create_data),
249 _ <- increase_poll_votes_if_vote(create_data),
250 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:quick_insert, true, activity} ->
258 {:ok, activity}
259
260 {:fake, true, activity} ->
261 {:ok, activity}
262
263 {:error, message} ->
264 {:error, message}
265 end
266 end
267
268 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273
274 with listen_data <-
275 make_listen_data(
276 %{to: to, actor: actor, published: published, context: context, object: object},
277 additional
278 ),
279 {:ok, activity} <- insert(listen_data, local),
280 :ok <- maybe_federate(activity) do
281 {:ok, activity}
282 else
283 {:error, message} ->
284 {:error, message}
285 end
286 end
287
288 def accept(params) do
289 accept_or_reject("Accept", params)
290 end
291
292 def reject(params) do
293 accept_or_reject("Reject", params)
294 end
295
296 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
297 local = Map.get(params, :local, true)
298 activity_id = Map.get(params, :activity_id, nil)
299
300 with data <-
301 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
302 |> Utils.maybe_put("id", activity_id),
303 {:ok, activity} <- insert(data, local),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 end
307 end
308
309 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
310 local = !(params[:local] == false)
311 activity_id = params[:activity_id]
312
313 with data <- %{
314 "to" => to,
315 "cc" => cc,
316 "type" => "Update",
317 "actor" => actor,
318 "object" => object
319 },
320 data <- Utils.maybe_put(data, "id", activity_id),
321 {:ok, activity} <- insert(data, local),
322 :ok <- maybe_federate(activity) do
323 {:ok, activity}
324 end
325 end
326
327 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
328 def like(
329 %User{ap_id: ap_id} = user,
330 %Object{data: %{"id" => _}} = object,
331 activity_id \\ nil,
332 local \\ true
333 ) do
334 with nil <- get_existing_like(ap_id, object),
335 like_data <- make_like_data(user, object, activity_id),
336 {:ok, activity} <- insert(like_data, local),
337 {:ok, object} <- add_like_to_object(activity, object),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity, object}
340 else
341 %Activity{} = activity -> {:ok, activity, object}
342 error -> {:error, error}
343 end
344 end
345
346 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
347 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
348 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
349 {:ok, unlike_activity} <- insert(unlike_data, local),
350 {:ok, _activity} <- Repo.delete(like_activity),
351 {:ok, object} <- remove_like_from_object(like_activity, object),
352 :ok <- maybe_federate(unlike_activity) do
353 {:ok, unlike_activity, like_activity, object}
354 else
355 _e -> {:ok, object}
356 end
357 end
358
359 def announce(
360 %User{ap_id: _} = user,
361 %Object{data: %{"id" => _}} = object,
362 activity_id \\ nil,
363 local \\ true,
364 public \\ true
365 ) do
366 with true <- is_announceable?(object, user, public),
367 announce_data <- make_announce_data(user, object, activity_id, public),
368 {:ok, activity} <- insert(announce_data, local),
369 {:ok, object} <- add_announce_to_object(activity, object),
370 :ok <- maybe_federate(activity) do
371 {:ok, activity, object}
372 else
373 error -> {:error, error}
374 end
375 end
376
377 def unannounce(
378 %User{} = actor,
379 %Object{} = object,
380 activity_id \\ nil,
381 local \\ true
382 ) do
383 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
384 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
385 {:ok, unannounce_activity} <- insert(unannounce_data, local),
386 :ok <- maybe_federate(unannounce_activity),
387 {:ok, _activity} <- Repo.delete(announce_activity),
388 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
389 {:ok, unannounce_activity, object}
390 else
391 _e -> {:ok, object}
392 end
393 end
394
395 def follow(follower, followed, activity_id \\ nil, local \\ true) do
396 with data <- make_follow_data(follower, followed, activity_id),
397 {:ok, activity} <- insert(data, local),
398 :ok <- maybe_federate(activity),
399 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
400 {:ok, activity}
401 end
402 end
403
404 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
405 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
406 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
407 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
408 {:ok, activity} <- insert(unfollow_data, local),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
415 with data <- %{
416 "to" => [follower_address],
417 "type" => "Delete",
418 "actor" => ap_id,
419 "object" => %{"type" => "Person", "id" => ap_id}
420 },
421 {:ok, activity} <- insert(data, true, true, true),
422 :ok <- maybe_federate(activity) do
423 {:ok, user}
424 end
425 end
426
427 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
428 local = Keyword.get(options, :local, true)
429 activity_id = Keyword.get(options, :activity_id, nil)
430 actor = Keyword.get(options, :actor, actor)
431
432 user = User.get_cached_by_ap_id(actor)
433 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
434
435 with {:ok, object, activity} <- Object.delete(object),
436 data <-
437 %{
438 "type" => "Delete",
439 "actor" => actor,
440 "object" => id,
441 "to" => to,
442 "deleted_activity_id" => activity && activity.id
443 }
444 |> maybe_put("id", activity_id),
445 {:ok, activity} <- insert(data, local, false),
446 stream_out_participations(object, user),
447 _ <- decrease_replies_count_if_reply(object),
448 # Changing note count prior to enqueuing federation task in order to avoid
449 # race conditions on updating user.info
450 {:ok, _actor} <- decrease_note_count_if_public(user, object),
451 :ok <- maybe_federate(activity) do
452 {:ok, activity}
453 end
454 end
455
456 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
457 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
458 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
459 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
460
461 if unfollow_blocked do
462 follow_activity = fetch_latest_follow(blocker, blocked)
463 if follow_activity, do: unfollow(blocker, blocked, nil, local)
464 end
465
466 with true <- outgoing_blocks,
467 block_data <- make_block_data(blocker, blocked, activity_id),
468 {:ok, activity} <- insert(block_data, local),
469 :ok <- maybe_federate(activity) do
470 {:ok, activity}
471 else
472 _e -> {:ok, nil}
473 end
474 end
475
476 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
477 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
478 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
479 {:ok, activity} <- insert(unblock_data, local),
480 :ok <- maybe_federate(activity) do
481 {:ok, activity}
482 end
483 end
484
485 @spec flag(map()) :: {:ok, Activity.t()} | any
486 def flag(
487 %{
488 actor: actor,
489 context: _context,
490 account: account,
491 statuses: statuses,
492 content: content
493 } = params
494 ) do
495 # only accept false as false value
496 local = !(params[:local] == false)
497 forward = !(params[:forward] == false)
498
499 additional = params[:additional] || %{}
500
501 additional =
502 if forward do
503 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
504 else
505 Map.merge(additional, %{"to" => [], "cc" => []})
506 end
507
508 with flag_data <- make_flag_data(params, additional),
509 {:ok, activity} <- insert(flag_data, local),
510 :ok <- maybe_federate(activity) do
511 Enum.each(User.all_superusers(), fn superuser ->
512 superuser
513 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
514 |> Pleroma.Emails.Mailer.deliver_async()
515 end)
516
517 {:ok, activity}
518 end
519 end
520
521 defp fetch_activities_for_context_query(context, opts) do
522 public = [Pleroma.Constants.as_public()]
523
524 recipients =
525 if opts["user"],
526 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
527 else: public
528
529 from(activity in Activity)
530 |> maybe_preload_objects(opts)
531 |> maybe_preload_bookmarks(opts)
532 |> maybe_set_thread_muted_field(opts)
533 |> restrict_blocked(opts)
534 |> restrict_recipients(recipients, opts["user"])
535 |> where(
536 [activity],
537 fragment(
538 "?->>'type' = ? and ?->>'context' = ?",
539 activity.data,
540 "Create",
541 activity.data,
542 ^context
543 )
544 )
545 |> exclude_poll_votes(opts)
546 |> exclude_id(opts)
547 |> order_by([activity], desc: activity.id)
548 end
549
550 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
551 def fetch_activities_for_context(context, opts \\ %{}) do
552 context
553 |> fetch_activities_for_context_query(opts)
554 |> Repo.all()
555 end
556
557 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
558 FlakeId.Ecto.CompatType.t() | nil
559 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
560 context
561 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
562 |> limit(1)
563 |> select([a], a.id)
564 |> Repo.one()
565 end
566
567 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
568 opts = Map.drop(opts, ["user"])
569
570 [Pleroma.Constants.as_public()]
571 |> fetch_activities_query(opts)
572 |> restrict_unlisted()
573 |> Pagination.fetch_paginated(opts, pagination)
574 |> Enum.reverse()
575 end
576
577 @valid_visibilities ~w[direct unlisted public private]
578
579 defp restrict_visibility(query, %{visibility: visibility})
580 when is_list(visibility) do
581 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
582 query =
583 from(
584 a in query,
585 where:
586 fragment(
587 "activity_visibility(?, ?, ?) = ANY (?)",
588 a.actor,
589 a.recipients,
590 a.data,
591 ^visibility
592 )
593 )
594
595 query
596 else
597 Logger.error("Could not restrict visibility to #{visibility}")
598 end
599 end
600
601 defp restrict_visibility(query, %{visibility: visibility})
602 when visibility in @valid_visibilities do
603 from(
604 a in query,
605 where:
606 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
607 )
608 end
609
610 defp restrict_visibility(_query, %{visibility: visibility})
611 when visibility not in @valid_visibilities do
612 Logger.error("Could not restrict visibility to #{visibility}")
613 end
614
615 defp restrict_visibility(query, _visibility), do: query
616
617 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
618 when is_list(visibility) do
619 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
620 from(
621 a in query,
622 where:
623 not fragment(
624 "activity_visibility(?, ?, ?) = ANY (?)",
625 a.actor,
626 a.recipients,
627 a.data,
628 ^visibility
629 )
630 )
631 else
632 Logger.error("Could not exclude visibility to #{visibility}")
633 query
634 end
635 end
636
637 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
638 when visibility in @valid_visibilities do
639 from(
640 a in query,
641 where:
642 not fragment(
643 "activity_visibility(?, ?, ?) = ?",
644 a.actor,
645 a.recipients,
646 a.data,
647 ^visibility
648 )
649 )
650 end
651
652 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
653 when visibility not in @valid_visibilities do
654 Logger.error("Could not exclude visibility to #{visibility}")
655 query
656 end
657
658 defp exclude_visibility(query, _visibility), do: query
659
660 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
661 do: query
662
663 defp restrict_thread_visibility(
664 query,
665 %{"user" => %User{info: %{skip_thread_containment: true}}},
666 _
667 ),
668 do: query
669
670 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
671 from(
672 a in query,
673 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
674 )
675 end
676
677 defp restrict_thread_visibility(query, _, _), do: query
678
679 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
680 params =
681 params
682 |> Map.put("user", reading_user)
683 |> Map.put("actor_id", user.ap_id)
684 |> Map.put("whole_db", true)
685
686 recipients =
687 user_activities_recipients(%{
688 "godmode" => params["godmode"],
689 "reading_user" => reading_user
690 })
691
692 fetch_activities(recipients, params)
693 |> Enum.reverse()
694 end
695
696 def fetch_user_activities(user, reading_user, params \\ %{}) do
697 params =
698 params
699 |> Map.put("type", ["Create", "Announce"])
700 |> Map.put("user", reading_user)
701 |> Map.put("actor_id", user.ap_id)
702 |> Map.put("whole_db", true)
703 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
704
705 recipients =
706 user_activities_recipients(%{
707 "godmode" => params["godmode"],
708 "reading_user" => reading_user
709 })
710
711 fetch_activities(recipients, params)
712 |> Enum.reverse()
713 end
714
715 defp user_activities_recipients(%{"godmode" => true}) do
716 []
717 end
718
719 defp user_activities_recipients(%{"reading_user" => reading_user}) do
720 if reading_user do
721 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
722 else
723 [Pleroma.Constants.as_public()]
724 end
725 end
726
727 defp restrict_since(query, %{"since_id" => ""}), do: query
728
729 defp restrict_since(query, %{"since_id" => since_id}) do
730 from(activity in query, where: activity.id > ^since_id)
731 end
732
733 defp restrict_since(query, _), do: query
734
735 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
736 raise "Can't use the child object without preloading!"
737 end
738
739 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
740 when is_list(tag_reject) and tag_reject != [] do
741 from(
742 [_activity, object] in query,
743 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
744 )
745 end
746
747 defp restrict_tag_reject(query, _), do: query
748
749 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
750 raise "Can't use the child object without preloading!"
751 end
752
753 defp restrict_tag_all(query, %{"tag_all" => tag_all})
754 when is_list(tag_all) and tag_all != [] do
755 from(
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
758 )
759 end
760
761 defp restrict_tag_all(query, _), do: query
762
763 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
764 raise "Can't use the child object without preloading!"
765 end
766
767 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
768 from(
769 [_activity, object] in query,
770 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
771 )
772 end
773
774 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
775 from(
776 [_activity, object] in query,
777 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
778 )
779 end
780
781 defp restrict_tag(query, _), do: query
782
783 defp restrict_recipients(query, [], _user), do: query
784
785 defp restrict_recipients(query, recipients, nil) do
786 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
787 end
788
789 defp restrict_recipients(query, recipients, user) do
790 from(
791 activity in query,
792 where: fragment("? && ?", ^recipients, activity.recipients),
793 or_where: activity.actor == ^user.ap_id
794 )
795 end
796
797 defp restrict_local(query, %{"local_only" => true}) do
798 from(activity in query, where: activity.local == true)
799 end
800
801 defp restrict_local(query, _), do: query
802
803 defp restrict_actor(query, %{"actor_id" => actor_id}) do
804 from(activity in query, where: activity.actor == ^actor_id)
805 end
806
807 defp restrict_actor(query, _), do: query
808
809 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
810 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
811 end
812
813 defp restrict_type(query, %{"type" => type}) do
814 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
815 end
816
817 defp restrict_type(query, _), do: query
818
819 defp restrict_state(query, %{"state" => state}) do
820 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
821 end
822
823 defp restrict_state(query, _), do: query
824
825 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
826 from(
827 [_activity, object] in query,
828 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
829 )
830 end
831
832 defp restrict_favorited_by(query, _), do: query
833
834 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
835 raise "Can't use the child object without preloading!"
836 end
837
838 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
839 from(
840 [_activity, object] in query,
841 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
842 )
843 end
844
845 defp restrict_media(query, _), do: query
846
847 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
848 from(
849 [_activity, object] in query,
850 where: fragment("?->>'inReplyTo' is null", object.data)
851 )
852 end
853
854 defp restrict_replies(query, _), do: query
855
856 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
857 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
858 end
859
860 defp restrict_reblogs(query, _), do: query
861
862 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
863
864 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
865 mutes = info.mutes
866
867 query =
868 from([activity] in query,
869 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
870 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
871 )
872
873 unless opts["skip_preload"] do
874 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
875 else
876 query
877 end
878 end
879
880 defp restrict_muted(query, _), do: query
881
882 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
883 blocks = info.blocks || []
884 domain_blocks = info.domain_blocks || []
885
886 query =
887 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
888
889 from(
890 [activity, object: o] in query,
891 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
892 where: fragment("not (? && ?)", activity.recipients, ^blocks),
893 where:
894 fragment(
895 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
896 activity.data,
897 activity.data,
898 ^blocks
899 ),
900 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
901 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
902 )
903 end
904
905 defp restrict_blocked(query, _), do: query
906
907 defp restrict_unlisted(query) do
908 from(
909 activity in query,
910 where:
911 fragment(
912 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
913 activity.data,
914 ^[Pleroma.Constants.as_public()]
915 )
916 )
917 end
918
919 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
920 from(activity in query, where: activity.id in ^ids)
921 end
922
923 defp restrict_pinned(query, _), do: query
924
925 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
926 muted_reblogs = info.muted_reblogs || []
927
928 from(
929 activity in query,
930 where:
931 fragment(
932 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
933 activity.data,
934 activity.actor,
935 ^muted_reblogs
936 )
937 )
938 end
939
940 defp restrict_muted_reblogs(query, _), do: query
941
942 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
943
944 defp exclude_poll_votes(query, _) do
945 if has_named_binding?(query, :object) do
946 from([activity, object: o] in query,
947 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
948 )
949 else
950 query
951 end
952 end
953
954 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
955 from(activity in query, where: activity.id != ^id)
956 end
957
958 defp exclude_id(query, _), do: query
959
960 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
961
962 defp maybe_preload_objects(query, _) do
963 query
964 |> Activity.with_preloaded_object()
965 end
966
967 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
968
969 defp maybe_preload_bookmarks(query, opts) do
970 query
971 |> Activity.with_preloaded_bookmark(opts["user"])
972 end
973
974 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
975
976 defp maybe_set_thread_muted_field(query, opts) do
977 query
978 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
979 end
980
981 defp maybe_order(query, %{order: :desc}) do
982 query
983 |> order_by(desc: :id)
984 end
985
986 defp maybe_order(query, %{order: :asc}) do
987 query
988 |> order_by(asc: :id)
989 end
990
991 defp maybe_order(query, _), do: query
992
993 def fetch_activities_query(recipients, opts \\ %{}) do
994 config = %{
995 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
996 }
997
998 Activity
999 |> maybe_preload_objects(opts)
1000 |> maybe_preload_bookmarks(opts)
1001 |> maybe_set_thread_muted_field(opts)
1002 |> maybe_order(opts)
1003 |> restrict_recipients(recipients, opts["user"])
1004 |> restrict_tag(opts)
1005 |> restrict_tag_reject(opts)
1006 |> restrict_tag_all(opts)
1007 |> restrict_since(opts)
1008 |> restrict_local(opts)
1009 |> restrict_actor(opts)
1010 |> restrict_type(opts)
1011 |> restrict_state(opts)
1012 |> restrict_favorited_by(opts)
1013 |> restrict_blocked(opts)
1014 |> restrict_muted(opts)
1015 |> restrict_media(opts)
1016 |> restrict_visibility(opts)
1017 |> restrict_thread_visibility(opts, config)
1018 |> restrict_replies(opts)
1019 |> restrict_reblogs(opts)
1020 |> restrict_pinned(opts)
1021 |> restrict_muted_reblogs(opts)
1022 |> Activity.restrict_deactivated_users()
1023 |> exclude_poll_votes(opts)
1024 |> exclude_visibility(opts)
1025 end
1026
1027 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1028 list_memberships = Pleroma.List.memberships(opts["user"])
1029
1030 fetch_activities_query(recipients ++ list_memberships, opts)
1031 |> Pagination.fetch_paginated(opts, pagination)
1032 |> Enum.reverse()
1033 |> maybe_update_cc(list_memberships, opts["user"])
1034 end
1035
1036 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1037 when is_list(list_memberships) and length(list_memberships) > 0 do
1038 Enum.map(activities, fn
1039 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1040 if Enum.any?(bcc, &(&1 in list_memberships)) do
1041 update_in(activity.data["cc"], &[user_ap_id | &1])
1042 else
1043 activity
1044 end
1045
1046 activity ->
1047 activity
1048 end)
1049 end
1050
1051 defp maybe_update_cc(activities, _, _), do: activities
1052
1053 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1054 from(activity in query,
1055 where:
1056 fragment("? && ?", activity.recipients, ^recipients) or
1057 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1058 ^Pleroma.Constants.as_public() in activity.recipients)
1059 )
1060 end
1061
1062 def fetch_activities_bounded(
1063 recipients,
1064 recipients_with_public,
1065 opts \\ %{},
1066 pagination \\ :keyset
1067 ) do
1068 fetch_activities_query([], opts)
1069 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1070 |> Pagination.fetch_paginated(opts, pagination)
1071 |> Enum.reverse()
1072 end
1073
1074 def upload(file, opts \\ []) do
1075 with {:ok, data} <- Upload.store(file, opts) do
1076 obj_data =
1077 if opts[:actor] do
1078 Map.put(data, "actor", opts[:actor])
1079 else
1080 data
1081 end
1082
1083 Repo.insert(%Object{data: obj_data})
1084 end
1085 end
1086
1087 defp object_to_user_data(data) do
1088 avatar =
1089 data["icon"]["url"] &&
1090 %{
1091 "type" => "Image",
1092 "url" => [%{"href" => data["icon"]["url"]}]
1093 }
1094
1095 banner =
1096 data["image"]["url"] &&
1097 %{
1098 "type" => "Image",
1099 "url" => [%{"href" => data["image"]["url"]}]
1100 }
1101
1102 fields =
1103 data
1104 |> Map.get("attachment", [])
1105 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1106 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1107
1108 locked = data["manuallyApprovesFollowers"] || false
1109 data = Transmogrifier.maybe_fix_user_object(data)
1110 discoverable = data["discoverable"] || false
1111
1112 user_data = %{
1113 ap_id: data["id"],
1114 info: %{
1115 ap_enabled: true,
1116 source_data: data,
1117 banner: banner,
1118 fields: fields,
1119 locked: locked,
1120 discoverable: discoverable
1121 },
1122 avatar: avatar,
1123 name: data["name"],
1124 follower_address: data["followers"],
1125 following_address: data["following"],
1126 bio: data["summary"]
1127 }
1128
1129 # nickname can be nil because of virtual actors
1130 user_data =
1131 if data["preferredUsername"] do
1132 Map.put(
1133 user_data,
1134 :nickname,
1135 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1136 )
1137 else
1138 Map.put(user_data, :nickname, nil)
1139 end
1140
1141 {:ok, user_data}
1142 end
1143
1144 def fetch_follow_information_for_user(user) do
1145 with {:ok, following_data} <-
1146 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1147 following_count when is_integer(following_count) <- following_data["totalItems"],
1148 {:ok, hide_follows} <- collection_private(following_data),
1149 {:ok, followers_data} <-
1150 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1151 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1152 {:ok, hide_followers} <- collection_private(followers_data) do
1153 {:ok,
1154 %{
1155 hide_follows: hide_follows,
1156 follower_count: followers_count,
1157 following_count: following_count,
1158 hide_followers: hide_followers
1159 }}
1160 else
1161 {:error, _} = e ->
1162 e
1163
1164 e ->
1165 {:error, e}
1166 end
1167 end
1168
1169 defp maybe_update_follow_information(data) do
1170 with {:enabled, true} <-
1171 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1172 {:ok, info} <- fetch_follow_information_for_user(data) do
1173 info = Map.merge(data.info, info)
1174 Map.put(data, :info, info)
1175 else
1176 {:enabled, false} ->
1177 data
1178
1179 e ->
1180 Logger.error(
1181 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1182 )
1183
1184 data
1185 end
1186 end
1187
1188 defp collection_private(data) do
1189 if is_map(data["first"]) and
1190 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1191 {:ok, false}
1192 else
1193 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1194 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1195 {:ok, false}
1196 else
1197 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1198 {:ok, true}
1199
1200 {:error, _} = e ->
1201 e
1202
1203 e ->
1204 {:error, e}
1205 end
1206 end
1207 end
1208
1209 def user_data_from_user_object(data) do
1210 with {:ok, data} <- MRF.filter(data),
1211 {:ok, data} <- object_to_user_data(data) do
1212 {:ok, data}
1213 else
1214 e -> {:error, e}
1215 end
1216 end
1217
1218 def fetch_and_prepare_user_from_ap_id(ap_id) do
1219 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1220 {:ok, data} <- user_data_from_user_object(data),
1221 data <- maybe_update_follow_information(data) do
1222 {:ok, data}
1223 else
1224 e ->
1225 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1226 {:error, e}
1227 end
1228 end
1229
1230 def make_user_from_ap_id(ap_id) do
1231 if _user = User.get_cached_by_ap_id(ap_id) do
1232 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1233 else
1234 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1235 User.insert_or_update_user(data)
1236 else
1237 e -> {:error, e}
1238 end
1239 end
1240 end
1241
1242 def make_user_from_nickname(nickname) do
1243 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1244 make_user_from_ap_id(ap_id)
1245 else
1246 _e -> {:error, "No AP id in WebFinger"}
1247 end
1248 end
1249
1250 # filter out broken threads
1251 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1252 entire_thread_visible_for_user?(activity, user)
1253 end
1254
1255 # do post-processing on a specific activity
1256 def contain_activity(%Activity{} = activity, %User{} = user) do
1257 contain_broken_threads(activity, user)
1258 end
1259
1260 def fetch_direct_messages_query do
1261 Activity
1262 |> restrict_type(%{"type" => "Create"})
1263 |> restrict_visibility(%{visibility: "direct"})
1264 |> order_by([activity], asc: activity.id)
1265 end
1266 end