Merge branch 'bugfix/widen-streamer-blocks-for-1.1' into 'maint/1.1'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.Object.Containment
13 alias Pleroma.Object.Fetcher
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.Upload
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.MRF
19 alias Pleroma.Web.ActivityPub.Transmogrifier
20 alias Pleroma.Web.Streamer
21 alias Pleroma.Web.WebFinger
22
23 import Ecto.Query
24 import Pleroma.Web.ActivityPub.Utils
25 import Pleroma.Web.ActivityPub.Visibility
26
27 require Logger
28 require Pleroma.Constants
29
30 # For Announce activities, we filter the recipients based on following status for any actors
31 # that match actual users. See issue #164 for more information about why this is necessary.
32 defp get_recipients(%{"type" => "Announce"} = data) do
33 to = Map.get(data, "to", [])
34 cc = Map.get(data, "cc", [])
35 bcc = Map.get(data, "bcc", [])
36 actor = User.get_cached_by_ap_id(data["actor"])
37
38 recipients =
39 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
40 case User.get_cached_by_ap_id(recipient) do
41 nil -> true
42 user -> User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 actor = Map.get(data, "actor", [])
54 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
55 {recipients, to, cc}
56 end
57
58 defp get_recipients(data) do
59 to = Map.get(data, "to", [])
60 cc = Map.get(data, "cc", [])
61 bcc = Map.get(data, "bcc", [])
62 recipients = Enum.concat([to, cc, bcc])
63 {recipients, to, cc}
64 end
65
66 defp check_actor_is_active(actor) do
67 if not is_nil(actor) do
68 with user <- User.get_cached_by_ap_id(actor),
69 false <- user.info.deactivated do
70 true
71 else
72 _e -> false
73 end
74 else
75 true
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 def increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 def increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 def increase_replies_count_if_reply(_create_data), do: :noop
104
105 def decrease_replies_count_if_reply(%Object{
106 data: %{"inReplyTo" => reply_ap_id} = object
107 }) do
108 if is_public?(object) do
109 Object.decrease_replies_count(reply_ap_id)
110 end
111 end
112
113 def decrease_replies_count_if_reply(_object), do: :noop
114
115 def increase_poll_votes_if_vote(%{
116 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
117 "type" => "Create"
118 }) do
119 Object.increase_vote_count(reply_ap_id, name)
120 end
121
122 def increase_poll_votes_if_vote(_create_data), do: :noop
123
124 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
125 with nil <- Activity.normalize(map),
126 map <- lazy_put_activity_defaults(map, fake),
127 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
128 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
129 {:ok, map} <- MRF.filter(map),
130 {recipients, _, _} = get_recipients(map),
131 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
132 :ok <- Containment.contain_child(map),
133 {:ok, map, object} <- insert_full_object(map) do
134 {:ok, activity} =
135 Repo.insert(%Activity{
136 data: map,
137 local: local,
138 actor: map["actor"],
139 recipients: recipients
140 })
141
142 # Splice in the child object if we have one.
143 activity =
144 if not is_nil(object) do
145 Map.put(activity, :object, object)
146 else
147 activity
148 end
149
150 PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
151
152 Notification.create_notifications(activity)
153
154 participations =
155 activity
156 |> Conversation.create_or_bump_for()
157 |> get_participations()
158
159 stream_out(activity)
160 stream_out_participations(participations)
161 {:ok, activity}
162 else
163 %Activity{} = activity ->
164 {:ok, activity}
165
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
168 data: map,
169 local: local,
170 actor: map["actor"],
171 recipients: recipients,
172 id: "pleroma:fakeid"
173 }
174
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 {:ok, activity}
177
178 error ->
179 {:error, error}
180 end
181 end
182
183 defp get_participations({:ok, %{participations: participations}}), do: participations
184 defp get_participations(_), do: []
185
186 def stream_out_participations(participations) do
187 participations =
188 participations
189 |> Repo.preload(:user)
190
191 Streamer.stream("participation", participations)
192 end
193
194 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
195 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
196 conversation = Repo.preload(conversation, :participations),
197 last_activity_id =
198 fetch_latest_activity_id_for_context(conversation.ap_id, %{
199 "user" => user,
200 "blocking_user" => user
201 }) do
202 if last_activity_id do
203 stream_out_participations(conversation.participations)
204 end
205 end
206 end
207
208 def stream_out_participations(_, _), do: :noop
209
210 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
211 when data_type in ["Create", "Announce", "Delete"] do
212 activity
213 |> Topics.get_activity_topics()
214 |> Streamer.stream(activity)
215 end
216
217 def stream_out(_activity) do
218 :noop
219 end
220
221 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
222 additional = params[:additional] || %{}
223 # only accept false as false value
224 local = !(params[:local] == false)
225 published = params[:published]
226
227 with create_data <-
228 make_create_data(
229 %{to: to, actor: actor, published: published, context: context, object: object},
230 additional
231 ),
232 {:ok, activity} <- insert(create_data, local, fake),
233 {:fake, false, activity} <- {:fake, fake, activity},
234 _ <- increase_replies_count_if_reply(create_data),
235 _ <- increase_poll_votes_if_vote(create_data),
236 # Changing note count prior to enqueuing federation task in order to avoid
237 # race conditions on updating user.info
238 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
239 :ok <- maybe_federate(activity) do
240 {:ok, activity}
241 else
242 {:fake, true, activity} ->
243 {:ok, activity}
244
245 {:error, message} ->
246 {:error, message}
247 end
248 end
249
250 def accept(%{to: to, actor: actor, object: object} = params) do
251 # only accept false as false value
252 local = !(params[:local] == false)
253
254 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
255 {:ok, activity} <- insert(data, local),
256 :ok <- maybe_federate(activity) do
257 {:ok, activity}
258 end
259 end
260
261 def reject(%{to: to, actor: actor, object: object} = params) do
262 # only accept false as false value
263 local = !(params[:local] == false)
264
265 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
266 {:ok, activity} <- insert(data, local),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity}
269 end
270 end
271
272 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
273 # only accept false as false value
274 local = !(params[:local] == false)
275
276 with data <- %{
277 "to" => to,
278 "cc" => cc,
279 "type" => "Update",
280 "actor" => actor,
281 "object" => object
282 },
283 {:ok, activity} <- insert(data, local),
284 :ok <- maybe_federate(activity) do
285 {:ok, activity}
286 end
287 end
288
289 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
290 def like(
291 %User{ap_id: ap_id} = user,
292 %Object{data: %{"id" => _}} = object,
293 activity_id \\ nil,
294 local \\ true
295 ) do
296 with nil <- get_existing_like(ap_id, object),
297 like_data <- make_like_data(user, object, activity_id),
298 {:ok, activity} <- insert(like_data, local),
299 {:ok, object} <- add_like_to_object(activity, object),
300 :ok <- maybe_federate(activity) do
301 {:ok, activity, object}
302 else
303 %Activity{} = activity -> {:ok, activity, object}
304 error -> {:error, error}
305 end
306 end
307
308 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
309 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
310 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
311 {:ok, unlike_activity} <- insert(unlike_data, local),
312 {:ok, _activity} <- Repo.delete(like_activity),
313 {:ok, object} <- remove_like_from_object(like_activity, object),
314 :ok <- maybe_federate(unlike_activity) do
315 {:ok, unlike_activity, like_activity, object}
316 else
317 _e -> {:ok, object}
318 end
319 end
320
321 def announce(
322 %User{ap_id: _} = user,
323 %Object{data: %{"id" => _}} = object,
324 activity_id \\ nil,
325 local \\ true,
326 public \\ true
327 ) do
328 with true <- is_public?(object),
329 announce_data <- make_announce_data(user, object, activity_id, public),
330 {:ok, activity} <- insert(announce_data, local),
331 {:ok, object} <- add_announce_to_object(activity, object),
332 :ok <- maybe_federate(activity) do
333 {:ok, activity, object}
334 else
335 error -> {:error, error}
336 end
337 end
338
339 def unannounce(
340 %User{} = actor,
341 %Object{} = object,
342 activity_id \\ nil,
343 local \\ true
344 ) do
345 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
346 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
347 {:ok, unannounce_activity} <- insert(unannounce_data, local),
348 :ok <- maybe_federate(unannounce_activity),
349 {:ok, _activity} <- Repo.delete(announce_activity),
350 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
351 {:ok, unannounce_activity, object}
352 else
353 _e -> {:ok, object}
354 end
355 end
356
357 def follow(follower, followed, activity_id \\ nil, local \\ true) do
358 with data <- make_follow_data(follower, followed, activity_id),
359 {:ok, activity} <- insert(data, local),
360 :ok <- maybe_federate(activity),
361 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
362 {:ok, activity}
363 end
364 end
365
366 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
367 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
368 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
369 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
370 {:ok, activity} <- insert(unfollow_data, local),
371 :ok <- maybe_federate(activity) do
372 {:ok, activity}
373 end
374 end
375
376 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
377 with data <- %{
378 "to" => [follower_address],
379 "type" => "Delete",
380 "actor" => ap_id,
381 "object" => %{"type" => "Person", "id" => ap_id}
382 },
383 {:ok, activity} <- insert(data, true, true, true),
384 :ok <- maybe_federate(activity) do
385 {:ok, user}
386 end
387 end
388
389 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
390 user = User.get_cached_by_ap_id(actor)
391 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
392
393 with {:ok, object, activity} <- Object.delete(object),
394 data <- %{
395 "type" => "Delete",
396 "actor" => actor,
397 "object" => id,
398 "to" => to,
399 "deleted_activity_id" => activity && activity.id
400 },
401 {:ok, activity} <- insert(data, local, false),
402 stream_out_participations(object, user),
403 _ <- decrease_replies_count_if_reply(object),
404 # Changing note count prior to enqueuing federation task in order to avoid
405 # race conditions on updating user.info
406 {:ok, _actor} <- decrease_note_count_if_public(user, object),
407 :ok <- maybe_federate(activity) do
408 {:ok, activity}
409 end
410 end
411
412 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
413 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
414 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
415
416 if unfollow_blocked do
417 follow_activity = fetch_latest_follow(blocker, blocked)
418 if follow_activity, do: unfollow(blocker, blocked, nil, local)
419 end
420
421 with true <- outgoing_blocks,
422 block_data <- make_block_data(blocker, blocked, activity_id),
423 {:ok, activity} <- insert(block_data, local),
424 :ok <- maybe_federate(activity) do
425 {:ok, activity}
426 else
427 _e -> {:ok, nil}
428 end
429 end
430
431 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
432 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
433 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
434 {:ok, activity} <- insert(unblock_data, local),
435 :ok <- maybe_federate(activity) do
436 {:ok, activity}
437 end
438 end
439
440 def flag(
441 %{
442 actor: actor,
443 context: context,
444 account: account,
445 statuses: statuses,
446 content: content
447 } = params
448 ) do
449 # only accept false as false value
450 local = !(params[:local] == false)
451 forward = !(params[:forward] == false)
452
453 additional = params[:additional] || %{}
454
455 params = %{
456 actor: actor,
457 context: context,
458 account: account,
459 statuses: statuses,
460 content: content
461 }
462
463 additional =
464 if forward do
465 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
466 else
467 Map.merge(additional, %{"to" => [], "cc" => []})
468 end
469
470 with flag_data <- make_flag_data(params, additional),
471 {:ok, activity} <- insert(flag_data, local),
472 :ok <- maybe_federate(activity) do
473 Enum.each(User.all_superusers(), fn superuser ->
474 superuser
475 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
476 |> Pleroma.Emails.Mailer.deliver_async()
477 end)
478
479 {:ok, activity}
480 end
481 end
482
483 defp fetch_activities_for_context_query(context, opts) do
484 public = [Pleroma.Constants.as_public()]
485
486 recipients =
487 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
488
489 from(activity in Activity)
490 |> maybe_preload_objects(opts)
491 |> maybe_preload_bookmarks(opts)
492 |> maybe_set_thread_muted_field(opts)
493 |> restrict_blocked(opts)
494 |> restrict_recipients(recipients, opts["user"])
495 |> where(
496 [activity],
497 fragment(
498 "?->>'type' = ? and ?->>'context' = ?",
499 activity.data,
500 "Create",
501 activity.data,
502 ^context
503 )
504 )
505 |> exclude_poll_votes(opts)
506 |> exclude_id(opts)
507 |> order_by([activity], desc: activity.id)
508 end
509
510 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
511 def fetch_activities_for_context(context, opts \\ %{}) do
512 context
513 |> fetch_activities_for_context_query(opts)
514 |> Repo.all()
515 end
516
517 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
518 Pleroma.FlakeId.t() | nil
519 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
520 context
521 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
522 |> limit(1)
523 |> select([a], a.id)
524 |> Repo.one()
525 end
526
527 def fetch_public_activities(opts \\ %{}) do
528 q = fetch_activities_query([Pleroma.Constants.as_public()], opts)
529
530 q
531 |> restrict_unlisted()
532 |> Pagination.fetch_paginated(opts)
533 |> Enum.reverse()
534 end
535
536 @valid_visibilities ~w[direct unlisted public private]
537
538 defp restrict_visibility(query, %{visibility: visibility})
539 when is_list(visibility) do
540 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
541 query =
542 from(
543 a in query,
544 where:
545 fragment(
546 "activity_visibility(?, ?, ?) = ANY (?)",
547 a.actor,
548 a.recipients,
549 a.data,
550 ^visibility
551 )
552 )
553
554 query
555 else
556 Logger.error("Could not restrict visibility to #{visibility}")
557 end
558 end
559
560 defp restrict_visibility(query, %{visibility: visibility})
561 when visibility in @valid_visibilities do
562 from(
563 a in query,
564 where:
565 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
566 )
567 end
568
569 defp restrict_visibility(_query, %{visibility: visibility})
570 when visibility not in @valid_visibilities do
571 Logger.error("Could not restrict visibility to #{visibility}")
572 end
573
574 defp restrict_visibility(query, _visibility), do: query
575
576 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
577 do: query
578
579 defp restrict_thread_visibility(
580 query,
581 %{"user" => %User{info: %{skip_thread_containment: true}}},
582 _
583 ),
584 do: query
585
586 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
587 from(
588 a in query,
589 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
590 )
591 end
592
593 defp restrict_thread_visibility(query, _, _), do: query
594
595 def fetch_user_activities(user, reading_user, params \\ %{}) do
596 params =
597 params
598 |> Map.put("type", ["Create", "Announce"])
599 |> Map.put("user", reading_user)
600 |> Map.put("actor_id", user.ap_id)
601 |> Map.put("whole_db", true)
602 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
603
604 recipients =
605 user_activities_recipients(%{
606 "godmode" => params["godmode"],
607 "reading_user" => reading_user
608 })
609
610 fetch_activities(recipients, params)
611 |> Enum.reverse()
612 end
613
614 defp user_activities_recipients(%{"godmode" => true}) do
615 []
616 end
617
618 defp user_activities_recipients(%{"reading_user" => reading_user}) do
619 if reading_user do
620 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
621 else
622 [Pleroma.Constants.as_public()]
623 end
624 end
625
626 defp restrict_since(query, %{"since_id" => ""}), do: query
627
628 defp restrict_since(query, %{"since_id" => since_id}) do
629 from(activity in query, where: activity.id > ^since_id)
630 end
631
632 defp restrict_since(query, _), do: query
633
634 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
635 raise "Can't use the child object without preloading!"
636 end
637
638 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
639 when is_list(tag_reject) and tag_reject != [] do
640 from(
641 [_activity, object] in query,
642 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
643 )
644 end
645
646 defp restrict_tag_reject(query, _), do: query
647
648 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
649 raise "Can't use the child object without preloading!"
650 end
651
652 defp restrict_tag_all(query, %{"tag_all" => tag_all})
653 when is_list(tag_all) and tag_all != [] do
654 from(
655 [_activity, object] in query,
656 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
657 )
658 end
659
660 defp restrict_tag_all(query, _), do: query
661
662 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
663 raise "Can't use the child object without preloading!"
664 end
665
666 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
667 from(
668 [_activity, object] in query,
669 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
670 )
671 end
672
673 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
674 from(
675 [_activity, object] in query,
676 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
677 )
678 end
679
680 defp restrict_tag(query, _), do: query
681
682 defp restrict_recipients(query, [], _user), do: query
683
684 defp restrict_recipients(query, recipients, nil) do
685 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
686 end
687
688 defp restrict_recipients(query, recipients, user) do
689 from(
690 activity in query,
691 where: fragment("? && ?", ^recipients, activity.recipients),
692 or_where: activity.actor == ^user.ap_id
693 )
694 end
695
696 defp restrict_local(query, %{"local_only" => true}) do
697 from(activity in query, where: activity.local == true)
698 end
699
700 defp restrict_local(query, _), do: query
701
702 defp restrict_actor(query, %{"actor_id" => actor_id}) do
703 from(activity in query, where: activity.actor == ^actor_id)
704 end
705
706 defp restrict_actor(query, _), do: query
707
708 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
709 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
710 end
711
712 defp restrict_type(query, %{"type" => type}) do
713 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
714 end
715
716 defp restrict_type(query, _), do: query
717
718 defp restrict_state(query, %{"state" => state}) do
719 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
720 end
721
722 defp restrict_state(query, _), do: query
723
724 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
725 from(
726 [_activity, object] in query,
727 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
728 )
729 end
730
731 defp restrict_favorited_by(query, _), do: query
732
733 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
734 raise "Can't use the child object without preloading!"
735 end
736
737 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
738 from(
739 [_activity, object] in query,
740 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
741 )
742 end
743
744 defp restrict_media(query, _), do: query
745
746 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
747 from(
748 activity in query,
749 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
750 )
751 end
752
753 defp restrict_replies(query, _), do: query
754
755 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
756 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
757 end
758
759 defp restrict_reblogs(query, _), do: query
760
761 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
762
763 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
764 mutes = info.mutes
765
766 query =
767 from([activity] in query,
768 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
769 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
770 )
771
772 unless opts["skip_preload"] do
773 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
774 else
775 query
776 end
777 end
778
779 defp restrict_muted(query, _), do: query
780
781 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
782 blocks = info.blocks || []
783 domain_blocks = info.domain_blocks || []
784
785 query =
786 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
787
788 from(
789 [activity, object: o] in query,
790 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
791 where: fragment("not (? && ?)", activity.recipients, ^blocks),
792 where:
793 fragment(
794 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
795 activity.data,
796 activity.data,
797 ^blocks
798 ),
799 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
800 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
801 )
802 end
803
804 defp restrict_blocked(query, _), do: query
805
806 defp restrict_unlisted(query) do
807 from(
808 activity in query,
809 where:
810 fragment(
811 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
812 activity.data,
813 ^[Pleroma.Constants.as_public()]
814 )
815 )
816 end
817
818 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
819 from(activity in query, where: activity.id in ^ids)
820 end
821
822 defp restrict_pinned(query, _), do: query
823
824 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
825 muted_reblogs = info.muted_reblogs || []
826
827 from(
828 activity in query,
829 where:
830 fragment(
831 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
832 activity.data,
833 activity.actor,
834 ^muted_reblogs
835 )
836 )
837 end
838
839 defp restrict_muted_reblogs(query, _), do: query
840
841 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
842
843 defp exclude_poll_votes(query, _) do
844 if has_named_binding?(query, :object) do
845 from([activity, object: o] in query,
846 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
847 )
848 else
849 query
850 end
851 end
852
853 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
854 from(activity in query, where: activity.id != ^id)
855 end
856
857 defp exclude_id(query, _), do: query
858
859 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
860
861 defp maybe_preload_objects(query, _) do
862 query
863 |> Activity.with_preloaded_object()
864 end
865
866 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
867
868 defp maybe_preload_bookmarks(query, opts) do
869 query
870 |> Activity.with_preloaded_bookmark(opts["user"])
871 end
872
873 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
874
875 defp maybe_set_thread_muted_field(query, opts) do
876 query
877 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
878 end
879
880 defp maybe_order(query, %{order: :desc}) do
881 query
882 |> order_by(desc: :id)
883 end
884
885 defp maybe_order(query, %{order: :asc}) do
886 query
887 |> order_by(asc: :id)
888 end
889
890 defp maybe_order(query, _), do: query
891
892 def fetch_activities_query(recipients, opts \\ %{}) do
893 config = %{
894 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
895 }
896
897 Activity
898 |> maybe_preload_objects(opts)
899 |> maybe_preload_bookmarks(opts)
900 |> maybe_set_thread_muted_field(opts)
901 |> maybe_order(opts)
902 |> restrict_recipients(recipients, opts["user"])
903 |> restrict_tag(opts)
904 |> restrict_tag_reject(opts)
905 |> restrict_tag_all(opts)
906 |> restrict_since(opts)
907 |> restrict_local(opts)
908 |> restrict_actor(opts)
909 |> restrict_type(opts)
910 |> restrict_state(opts)
911 |> restrict_favorited_by(opts)
912 |> restrict_blocked(opts)
913 |> restrict_muted(opts)
914 |> restrict_media(opts)
915 |> restrict_visibility(opts)
916 |> restrict_thread_visibility(opts, config)
917 |> restrict_replies(opts)
918 |> restrict_reblogs(opts)
919 |> restrict_pinned(opts)
920 |> restrict_muted_reblogs(opts)
921 |> Activity.restrict_deactivated_users()
922 |> exclude_poll_votes(opts)
923 end
924
925 def fetch_activities(recipients, opts \\ %{}) do
926 list_memberships = Pleroma.List.memberships(opts["user"])
927
928 fetch_activities_query(recipients ++ list_memberships, opts)
929 |> Pagination.fetch_paginated(opts)
930 |> Enum.reverse()
931 |> maybe_update_cc(list_memberships, opts["user"])
932 end
933
934 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
935 when is_list(list_memberships) and length(list_memberships) > 0 do
936 Enum.map(activities, fn
937 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
938 if Enum.any?(bcc, &(&1 in list_memberships)) do
939 update_in(activity.data["cc"], &[user_ap_id | &1])
940 else
941 activity
942 end
943
944 activity ->
945 activity
946 end)
947 end
948
949 defp maybe_update_cc(activities, _, _), do: activities
950
951 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
952 from(activity in query,
953 where:
954 fragment("? && ?", activity.recipients, ^recipients) or
955 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
956 ^Pleroma.Constants.as_public() in activity.recipients)
957 )
958 end
959
960 def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
961 fetch_activities_query([], opts)
962 |> fetch_activities_bounded_query(recipients, recipients_with_public)
963 |> Pagination.fetch_paginated(opts)
964 |> Enum.reverse()
965 end
966
967 def upload(file, opts \\ []) do
968 with {:ok, data} <- Upload.store(file, opts) do
969 obj_data =
970 if opts[:actor] do
971 Map.put(data, "actor", opts[:actor])
972 else
973 data
974 end
975
976 Repo.insert(%Object{data: obj_data})
977 end
978 end
979
980 defp object_to_user_data(data) do
981 avatar =
982 data["icon"]["url"] &&
983 %{
984 "type" => "Image",
985 "url" => [%{"href" => data["icon"]["url"]}]
986 }
987
988 banner =
989 data["image"]["url"] &&
990 %{
991 "type" => "Image",
992 "url" => [%{"href" => data["image"]["url"]}]
993 }
994
995 fields =
996 data
997 |> Map.get("attachment", [])
998 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
999 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1000
1001 locked = data["manuallyApprovesFollowers"] || false
1002 data = Transmogrifier.maybe_fix_user_object(data)
1003
1004 user_data = %{
1005 ap_id: data["id"],
1006 info: %{
1007 ap_enabled: true,
1008 source_data: data,
1009 banner: banner,
1010 fields: fields,
1011 locked: locked
1012 },
1013 avatar: avatar,
1014 name: data["name"],
1015 follower_address: data["followers"],
1016 following_address: data["following"],
1017 bio: data["summary"]
1018 }
1019
1020 # nickname can be nil because of virtual actors
1021 user_data =
1022 if data["preferredUsername"] do
1023 Map.put(
1024 user_data,
1025 :nickname,
1026 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1027 )
1028 else
1029 Map.put(user_data, :nickname, nil)
1030 end
1031
1032 {:ok, user_data}
1033 end
1034
1035 def fetch_follow_information_for_user(user) do
1036 with {:ok, following_data} <-
1037 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1038 following_count when is_integer(following_count) <- following_data["totalItems"],
1039 {:ok, hide_follows} <- collection_private(following_data),
1040 {:ok, followers_data} <-
1041 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1042 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1043 {:ok, hide_followers} <- collection_private(followers_data) do
1044 {:ok,
1045 %{
1046 hide_follows: hide_follows,
1047 follower_count: followers_count,
1048 following_count: following_count,
1049 hide_followers: hide_followers
1050 }}
1051 else
1052 {:error, _} = e ->
1053 e
1054
1055 e ->
1056 {:error, e}
1057 end
1058 end
1059
1060 defp maybe_update_follow_information(data) do
1061 with {:enabled, true} <-
1062 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1063 {:ok, info} <- fetch_follow_information_for_user(data) do
1064 info = Map.merge(data.info, info)
1065 Map.put(data, :info, info)
1066 else
1067 {:enabled, false} ->
1068 data
1069
1070 e ->
1071 Logger.error(
1072 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1073 )
1074
1075 data
1076 end
1077 end
1078
1079 defp collection_private(data) do
1080 if is_map(data["first"]) and
1081 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1082 {:ok, false}
1083 else
1084 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1085 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1086 {:ok, false}
1087 else
1088 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1089 {:ok, true}
1090
1091 {:error, _} = e ->
1092 e
1093
1094 e ->
1095 {:error, e}
1096 end
1097 end
1098 end
1099
1100 def user_data_from_user_object(data) do
1101 with {:ok, data} <- MRF.filter(data),
1102 {:ok, data} <- object_to_user_data(data) do
1103 {:ok, data}
1104 else
1105 e -> {:error, e}
1106 end
1107 end
1108
1109 def fetch_and_prepare_user_from_ap_id(ap_id) do
1110 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1111 {:ok, data} <- user_data_from_user_object(data),
1112 data <- maybe_update_follow_information(data) do
1113 {:ok, data}
1114 else
1115 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1116 end
1117 end
1118
1119 def make_user_from_ap_id(ap_id) do
1120 if _user = User.get_cached_by_ap_id(ap_id) do
1121 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1122 else
1123 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1124 User.insert_or_update_user(data)
1125 else
1126 e -> {:error, e}
1127 end
1128 end
1129 end
1130
1131 def make_user_from_nickname(nickname) do
1132 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1133 make_user_from_ap_id(ap_id)
1134 else
1135 _e -> {:error, "No AP id in WebFinger"}
1136 end
1137 end
1138
1139 # filter out broken threads
1140 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1141 entire_thread_visible_for_user?(activity, user)
1142 end
1143
1144 # do post-processing on a specific activity
1145 def contain_activity(%Activity{} = activity, %User{} = user) do
1146 contain_broken_threads(activity, user)
1147 end
1148
1149 def fetch_direct_messages_query do
1150 Activity
1151 |> restrict_type(%{"type" => "Create"})
1152 |> restrict_visibility(%{visibility: "direct"})
1153 |> order_by([activity], asc: activity.id)
1154 end
1155 end