minor-changes (#313)
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 original_activity =
198 case activity do
199 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
200 Activity.get_create_by_object_ap_id_with_object(id)
201
202 _ ->
203 activity
204 end
205
206 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
207 participations = get_participations(conversation)
208 stream_out(activity)
209 stream_out_participations(participations)
210 end
211
212 defp maybe_create_activity_expiration(
213 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
214 ) do
215 with {:ok, _job} <-
216 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
217 activity_id: activity.id,
218 expires_at: expires_at
219 }) do
220 {:ok, activity}
221 end
222 end
223
224 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
225
226 defp create_or_bump_conversation(activity, actor) do
227 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
228 %User{} = user <- User.get_cached_by_ap_id(actor) do
229 Participation.mark_as_read(user, conversation)
230 {:ok, conversation}
231 end
232 end
233
234 defp get_participations({:ok, conversation}) do
235 conversation
236 |> Repo.preload(:participations, force: true)
237 |> Map.get(:participations)
238 end
239
240 defp get_participations(_), do: []
241
242 def stream_out_participations(participations) do
243 participations =
244 participations
245 |> Repo.preload(:user)
246
247 Streamer.stream("participation", participations)
248 end
249
250 @impl true
251 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
252 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
253 conversation = Repo.preload(conversation, :participations)
254
255 last_activity_id =
256 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
257 user: user,
258 blocking_user: user
259 })
260
261 if last_activity_id do
262 stream_out_participations(conversation.participations)
263 end
264 end
265 end
266
267 @impl true
268 def stream_out_participations(_, _), do: :noop
269
270 @impl true
271 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
272 when data_type in ["Create", "Announce", "Delete", "Update"] do
273 activity
274 |> Topics.get_activity_topics()
275 |> Streamer.stream(activity)
276 end
277
278 @impl true
279 def stream_out(_activity) do
280 :noop
281 end
282
283 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
284 def create(params, fake \\ false) do
285 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
286 result
287 end
288 end
289
290 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
291 additional = params[:additional] || %{}
292 # only accept false as false value
293 local = !(params[:local] == false)
294 published = params[:published]
295 quick_insert? = Config.get([:env]) == :benchmark
296
297 create_data =
298 make_create_data(
299 %{to: to, actor: actor, published: published, context: context, object: object},
300 additional
301 )
302
303 with {:ok, activity} <- insert(create_data, local, fake),
304 {:fake, false, activity} <- {:fake, fake, activity},
305 _ <- increase_replies_count_if_reply(create_data),
306 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
307 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
308 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
309 _ <- notify_and_stream(activity),
310 :ok <- maybe_schedule_poll_notifications(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 else
314 {:quick_insert, true, activity} ->
315 {:ok, activity}
316
317 {:fake, true, activity} ->
318 {:ok, activity}
319
320 {:error, message} ->
321 Repo.rollback(message)
322 end
323 end
324
325 defp maybe_schedule_poll_notifications(activity) do
326 PollWorker.schedule_poll_end(activity)
327 :ok
328 end
329
330 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
331 {:ok, Activity.t()} | nil | {:error, any()}
332 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
333 with {:ok, result} <-
334 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
335 result
336 end
337 end
338
339 defp do_unfollow(follower, followed, activity_id, local)
340
341 defp do_unfollow(follower, followed, activity_id, local) when local == true do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
344 {:ok, activity} <- insert(unfollow_data, local),
345 {:ok, _activity} <- Repo.delete(follow_activity),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, false) do
356 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
357 # uses deterministic ids for follows.
358 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
359 {:ok, _activity} <- Repo.delete(follow_activity),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
362 _ <- notify_and_stream(unfollow_activity) do
363 {:ok, unfollow_activity}
364 else
365 nil -> nil
366 {:error, error} -> Repo.rollback(error)
367 end
368 end
369
370 defp make_unfollow_activity(data, local) do
371 {recipients, _, _} = get_recipients(data)
372
373 %Activity{
374 data: data,
375 local: local,
376 actor: data["actor"],
377 recipients: recipients
378 }
379 end
380
381 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
382 def flag(params) do
383 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
384 result
385 end
386 end
387
388 defp do_flag(
389 %{
390 actor: actor,
391 context: _context,
392 account: account,
393 statuses: statuses,
394 content: content
395 } = params
396 ) do
397 # only accept false as false value
398 local = !(params[:local] == false)
399 forward = !(params[:forward] == false)
400
401 additional = params[:additional] || %{}
402
403 additional =
404 if forward do
405 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
406 else
407 Map.merge(additional, %{"to" => [], "cc" => []})
408 end
409
410 with flag_data <- make_flag_data(params, additional),
411 {:ok, activity} <- insert(flag_data, local),
412 {:ok, stripped_activity} <- strip_report_status_data(activity),
413 _ <- notify_and_stream(activity),
414 :ok <-
415 maybe_federate(stripped_activity) do
416 User.all_superusers()
417 |> Enum.filter(fn user -> user.ap_id != actor end)
418 |> Enum.filter(fn user -> not is_nil(user.email) end)
419 |> Enum.each(fn superuser ->
420 superuser
421 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
422 |> Pleroma.Emails.Mailer.deliver_async()
423 end)
424
425 {:ok, activity}
426 else
427 {:error, error} -> Repo.rollback(error)
428 end
429 end
430
431 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
432 def move(%User{} = origin, %User{} = target, local \\ true) do
433 params = %{
434 "type" => "Move",
435 "actor" => origin.ap_id,
436 "object" => origin.ap_id,
437 "target" => target.ap_id,
438 "to" => [origin.follower_address]
439 }
440
441 with true <- origin.ap_id in target.also_known_as,
442 {:ok, activity} <- insert(params, local),
443 _ <- notify_and_stream(activity) do
444 maybe_federate(activity)
445
446 BackgroundWorker.enqueue("move_following", %{
447 "origin_id" => origin.id,
448 "target_id" => target.id
449 })
450
451 {:ok, activity}
452 else
453 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
454 err -> err
455 end
456 end
457
458 def fetch_activities_for_context_query(context, opts) do
459 public = [Constants.as_public()]
460
461 recipients =
462 if opts[:user],
463 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
464 else: public
465
466 from(activity in Activity)
467 |> maybe_preload_objects(opts)
468 |> maybe_preload_bookmarks(opts)
469 |> maybe_set_thread_muted_field(opts)
470 |> restrict_blocked(opts)
471 |> restrict_blockers_visibility(opts)
472 |> restrict_recipients(recipients, opts[:user])
473 |> restrict_filtered(opts)
474 |> where(
475 [activity],
476 fragment(
477 "?->>'type' = ? and ?->>'context' = ?",
478 activity.data,
479 "Create",
480 activity.data,
481 ^context
482 )
483 )
484 |> exclude_poll_votes(opts)
485 |> exclude_id(opts)
486 |> order_by([activity], desc: activity.id)
487 end
488
489 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
490 def fetch_activities_for_context(context, opts \\ %{}) do
491 context
492 |> fetch_activities_for_context_query(opts)
493 |> Repo.all()
494 end
495
496 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
497 FlakeId.Ecto.CompatType.t() | nil
498 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
499 context
500 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
501 |> restrict_visibility(%{visibility: "direct"})
502 |> limit(1)
503 |> select([a], a.id)
504 |> Repo.one()
505 end
506
507 defp fetch_paginated_optimized(query, opts, pagination) do
508 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
509 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
510 opts = Map.put(opts, :skip_extra_order, true)
511
512 Pagination.fetch_paginated(query, opts, pagination)
513 end
514
515 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
516 list_memberships = Pleroma.List.memberships(opts[:user])
517
518 fetch_activities_query(recipients ++ list_memberships, opts)
519 |> fetch_paginated_optimized(opts, pagination)
520 |> Enum.reverse()
521 |> maybe_update_cc(list_memberships, opts[:user])
522 end
523
524 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
525 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
526 includes_local_public = Map.get(opts, :includes_local_public, false)
527
528 opts = Map.delete(opts, :user)
529
530 intended_recipients =
531 if includes_local_public do
532 [Constants.as_public(), as_local_public()]
533 else
534 [Constants.as_public()]
535 end
536
537 intended_recipients
538 |> fetch_activities_query(opts)
539 |> restrict_unlisted(opts)
540 |> fetch_paginated_optimized(opts, pagination)
541 end
542
543 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
544 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
545 opts
546 |> Map.put(:restrict_unlisted, true)
547 |> fetch_public_or_unlisted_activities(pagination)
548 end
549
550 @valid_visibilities ~w[direct unlisted public private]
551
552 defp restrict_visibility(query, %{visibility: visibility})
553 when is_list(visibility) do
554 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
555 from(
556 a in query,
557 where:
558 fragment(
559 "activity_visibility(?, ?, ?) = ANY (?)",
560 a.actor,
561 a.recipients,
562 a.data,
563 ^visibility
564 )
565 )
566 else
567 Logger.error("Could not restrict visibility to #{visibility}")
568 end
569 end
570
571 defp restrict_visibility(query, %{visibility: visibility})
572 when visibility in @valid_visibilities do
573 from(
574 a in query,
575 where:
576 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
577 )
578 end
579
580 defp restrict_visibility(_query, %{visibility: visibility})
581 when visibility not in @valid_visibilities do
582 Logger.error("Could not restrict visibility to #{visibility}")
583 end
584
585 defp restrict_visibility(query, _visibility), do: query
586
587 defp exclude_visibility(query, %{exclude_visibilities: visibility})
588 when is_list(visibility) do
589 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
590 from(
591 a in query,
592 where:
593 not fragment(
594 "activity_visibility(?, ?, ?) = ANY (?)",
595 a.actor,
596 a.recipients,
597 a.data,
598 ^visibility
599 )
600 )
601 else
602 Logger.error("Could not exclude visibility to #{visibility}")
603 query
604 end
605 end
606
607 defp exclude_visibility(query, %{exclude_visibilities: visibility})
608 when visibility in @valid_visibilities do
609 from(
610 a in query,
611 where:
612 not fragment(
613 "activity_visibility(?, ?, ?) = ?",
614 a.actor,
615 a.recipients,
616 a.data,
617 ^visibility
618 )
619 )
620 end
621
622 defp exclude_visibility(query, %{exclude_visibilities: visibility})
623 when visibility not in [nil | @valid_visibilities] do
624 Logger.error("Could not exclude visibility to #{visibility}")
625 query
626 end
627
628 defp exclude_visibility(query, _visibility), do: query
629
630 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
631 do: query
632
633 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
634 do: query
635
636 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
637 local_public = as_local_public()
638
639 from(
640 a in query,
641 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
642 )
643 end
644
645 defp restrict_thread_visibility(query, _, _), do: query
646
647 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
648 params =
649 params
650 |> Map.put(:user, reading_user)
651 |> Map.put(:actor_id, user.ap_id)
652
653 %{
654 godmode: params[:godmode],
655 reading_user: reading_user
656 }
657 |> user_activities_recipients()
658 |> fetch_activities(params)
659 |> Enum.reverse()
660 end
661
662 def fetch_user_activities(user, reading_user, params \\ %{})
663
664 def fetch_user_activities(user, reading_user, %{total: true} = params) do
665 result = fetch_activities_for_user(user, reading_user, params)
666
667 Keyword.put(result, :items, Enum.reverse(result[:items]))
668 end
669
670 def fetch_user_activities(user, reading_user, params) do
671 user
672 |> fetch_activities_for_user(reading_user, params)
673 |> Enum.reverse()
674 end
675
676 defp fetch_activities_for_user(user, reading_user, params) do
677 params =
678 params
679 |> Map.put(:type, ["Create", "Announce"])
680 |> Map.put(:user, reading_user)
681 |> Map.put(:actor_id, user.ap_id)
682 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
683
684 params =
685 if User.blocks?(reading_user, user) do
686 params
687 else
688 params
689 |> Map.put(:blocking_user, reading_user)
690 |> Map.put(:muting_user, reading_user)
691 end
692
693 pagination_type = Map.get(params, :pagination_type) || :keyset
694
695 %{
696 godmode: params[:godmode],
697 reading_user: reading_user
698 }
699 |> user_activities_recipients()
700 |> fetch_activities(params, pagination_type)
701 end
702
703 def fetch_statuses(reading_user, %{total: true} = params) do
704 result = fetch_activities_for_reading_user(reading_user, params)
705 Keyword.put(result, :items, Enum.reverse(result[:items]))
706 end
707
708 def fetch_statuses(reading_user, params) do
709 reading_user
710 |> fetch_activities_for_reading_user(params)
711 |> Enum.reverse()
712 end
713
714 defp fetch_activities_for_reading_user(reading_user, params) do
715 params = Map.put(params, :type, ["Create", "Announce"])
716
717 %{
718 godmode: params[:godmode],
719 reading_user: reading_user
720 }
721 |> user_activities_recipients()
722 |> fetch_activities(params, :offset)
723 end
724
725 defp user_activities_recipients(%{godmode: true}), do: []
726
727 defp user_activities_recipients(%{reading_user: reading_user}) do
728 if not is_nil(reading_user) and reading_user.local do
729 [
730 Constants.as_public(),
731 as_local_public(),
732 reading_user.ap_id | User.following(reading_user)
733 ]
734 else
735 [Constants.as_public()]
736 end
737 end
738
739 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
741 end
742
743 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
744 from(
745 [activity, object] in query,
746 where:
747 fragment(
748 "?->>'type' != ? or ?->>'actor' != ?",
749 activity.data,
750 "Announce",
751 object.data,
752 ^actor
753 )
754 )
755 end
756
757 defp restrict_announce_object_actor(query, _), do: query
758
759 defp restrict_since(query, %{since_id: ""}), do: query
760
761 defp restrict_since(query, %{since_id: since_id}) do
762 from(activity in query, where: activity.id > ^since_id)
763 end
764
765 defp restrict_since(query, _), do: query
766
767 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
768 raise_on_missing_preload()
769 end
770
771 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
772 from(
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
775 )
776 end
777
778 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
779 restrict_embedded_tag_any(query, %{tag: tag})
780 end
781
782 defp restrict_embedded_tag_all(query, _), do: query
783
784 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
789 from(
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
792 )
793 end
794
795 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: [tag]})
797 end
798
799 defp restrict_embedded_tag_any(query, _), do: query
800
801 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
806 from(
807 [_activity, object] in query,
808 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
809 )
810 end
811
812 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
813 when is_binary(tag_reject) do
814 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
815 end
816
817 defp restrict_embedded_tag_reject_any(query, _), do: query
818
819 defp object_ids_query_for_tags(tags) do
820 from(hto in "hashtags_objects")
821 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
822 |> where([hto, ht], ht.name in ^tags)
823 |> select([hto], hto.object_id)
824 |> distinct([hto], true)
825 end
826
827 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
828 raise_on_missing_preload()
829 end
830
831 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
832 restrict_hashtag_any(query, %{tag: single_tag})
833 end
834
835 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
836 from(
837 [_activity, object] in query,
838 where:
839 fragment(
840 """
841 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
842 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
843 AND hashtags_objects.object_id = ?) @> ?
844 """,
845 ^tags,
846 object.id,
847 ^tags
848 )
849 )
850 end
851
852 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
853 restrict_hashtag_all(query, %{tag_all: [tag]})
854 end
855
856 defp restrict_hashtag_all(query, _), do: query
857
858 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
859 raise_on_missing_preload()
860 end
861
862 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
863 hashtag_ids =
864 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
865 |> Repo.all()
866
867 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
868 from(
869 [_activity, object] in query,
870 join: hto in "hashtags_objects",
871 on: hto.object_id == object.id,
872 where: hto.hashtag_id in ^hashtag_ids,
873 distinct: [desc: object.id],
874 order_by: [desc: object.id]
875 )
876 end
877
878 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
879 restrict_hashtag_any(query, %{tag: [tag]})
880 end
881
882 defp restrict_hashtag_any(query, _), do: query
883
884 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
885 raise_on_missing_preload()
886 end
887
888 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
889 from(
890 [_activity, object] in query,
891 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
892 )
893 end
894
895 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
896 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
897 end
898
899 defp restrict_hashtag_reject_any(query, _), do: query
900
901 defp raise_on_missing_preload do
902 raise "Can't use the child object without preloading!"
903 end
904
905 defp restrict_recipients(query, [], _user), do: query
906
907 defp restrict_recipients(query, recipients, nil) do
908 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
909 end
910
911 defp restrict_recipients(query, recipients, user) do
912 from(
913 activity in query,
914 where: fragment("? && ?", ^recipients, activity.recipients),
915 or_where: activity.actor == ^user.ap_id
916 )
917 end
918
919 defp restrict_local(query, %{local_only: true}) do
920 from(activity in query, where: activity.local == true)
921 end
922
923 defp restrict_local(query, _), do: query
924
925 defp restrict_remote(query, %{remote: true}) do
926 from(activity in query, where: activity.local == false)
927 end
928
929 defp restrict_remote(query, _), do: query
930
931 defp restrict_actor(query, %{actor_id: actor_id}) do
932 from(activity in query, where: activity.actor == ^actor_id)
933 end
934
935 defp restrict_actor(query, _), do: query
936
937 defp restrict_type(query, %{type: type}) when is_binary(type) do
938 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
939 end
940
941 defp restrict_type(query, %{type: type}) do
942 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
943 end
944
945 defp restrict_type(query, _), do: query
946
947 defp restrict_state(query, %{state: state}) do
948 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
949 end
950
951 defp restrict_state(query, _), do: query
952
953 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
954 from(
955 [_activity, object] in query,
956 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
957 )
958 end
959
960 defp restrict_favorited_by(query, _), do: query
961
962 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
963 raise "Can't use the child object without preloading!"
964 end
965
966 defp restrict_media(query, %{only_media: true}) do
967 from(
968 [activity, object] in query,
969 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
970 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
971 )
972 end
973
974 defp restrict_media(query, _), do: query
975
976 defp restrict_replies(query, %{exclude_replies: true}) do
977 from(
978 [_activity, object] in query,
979 where: fragment("?->>'inReplyTo' is null", object.data)
980 )
981 end
982
983 defp restrict_replies(query, %{
984 reply_filtering_user: %User{} = user,
985 reply_visibility: "self"
986 }) do
987 from(
988 [activity, object] in query,
989 where:
990 fragment(
991 "?->>'inReplyTo' is null OR ? = ANY(?)",
992 object.data,
993 ^user.ap_id,
994 activity.recipients
995 )
996 )
997 end
998
999 defp restrict_replies(query, %{
1000 reply_filtering_user: %User{} = user,
1001 reply_visibility: "following"
1002 }) do
1003 from(
1004 [activity, object] in query,
1005 where:
1006 fragment(
1007 """
1008 ?->>'type' != 'Create' -- This isn't a Create
1009 OR ?->>'inReplyTo' is null -- this isn't a reply
1010 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1011 -- unless they are the author (because authors
1012 -- are also part of the recipients). This leads
1013 -- to a bug that self-replies by friends won't
1014 -- show up.
1015 OR ? = ? -- The actor is us
1016 """,
1017 activity.data,
1018 object.data,
1019 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1020 activity.recipients,
1021 activity.actor,
1022 activity.actor,
1023 ^user.ap_id
1024 )
1025 )
1026 end
1027
1028 defp restrict_replies(query, _), do: query
1029
1030 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1031 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1032 end
1033
1034 defp restrict_reblogs(query, _), do: query
1035
1036 defp restrict_muted(query, %{with_muted: true}), do: query
1037
1038 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1039 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1040
1041 query =
1042 from([activity] in query,
1043 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1044 where:
1045 fragment(
1046 "not (?->'to' \\?| ?) or ? = ?",
1047 activity.data,
1048 ^mutes,
1049 activity.actor,
1050 ^user.ap_id
1051 )
1052 )
1053
1054 unless opts[:skip_preload] do
1055 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1056 else
1057 query
1058 end
1059 end
1060
1061 defp restrict_muted(query, _), do: query
1062
1063 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1064 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1065 domain_blocks = user.domain_blocks || []
1066
1067 following_ap_ids = User.get_friends_ap_ids(user)
1068
1069 query =
1070 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1071
1072 from(
1073 [activity, object: o] in query,
1074 # You don't block the author
1075 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1076
1077 # You don't block any recipients, and didn't author the post
1078 where:
1079 fragment(
1080 "((not (? && ?)) or ? = ?)",
1081 activity.recipients,
1082 ^blocked_ap_ids,
1083 activity.actor,
1084 ^user.ap_id
1085 ),
1086
1087 # You don't block the domain of any recipients, and didn't author the post
1088 where:
1089 fragment(
1090 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1091 activity.recipients,
1092 ^domain_blocks,
1093 activity.actor,
1094 ^user.ap_id
1095 ),
1096
1097 # It's not a boost of a user you block
1098 where:
1099 fragment(
1100 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1101 activity.data,
1102 activity.data,
1103 ^blocked_ap_ids
1104 ),
1105
1106 # You don't block the author's domain, and also don't follow the author
1107 where:
1108 fragment(
1109 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1110 activity.actor,
1111 ^domain_blocks,
1112 activity.actor,
1113 ^following_ap_ids
1114 ),
1115
1116 # Same as above, but checks the Object
1117 where:
1118 fragment(
1119 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1120 o.data,
1121 ^domain_blocks,
1122 o.data,
1123 ^following_ap_ids
1124 )
1125 )
1126 end
1127
1128 defp restrict_blocked(query, _), do: query
1129
1130 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1131 if Config.get([:activitypub, :blockers_visible]) == true do
1132 query
1133 else
1134 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1135
1136 from(
1137 activity in query,
1138 # The author doesn't block you
1139 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1140
1141 # It's not a boost of a user that blocks you
1142 where:
1143 fragment(
1144 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1145 activity.data,
1146 activity.data,
1147 ^blocker_ap_ids
1148 )
1149 )
1150 end
1151 end
1152
1153 defp restrict_blockers_visibility(query, _), do: query
1154
1155 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1156 from(
1157 activity in query,
1158 where:
1159 fragment(
1160 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1161 activity.data,
1162 ^[Constants.as_public()]
1163 )
1164 )
1165 end
1166
1167 defp restrict_unlisted(query, _), do: query
1168
1169 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1170 from(
1171 [activity, object: o] in query,
1172 where:
1173 fragment(
1174 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1175 activity.data,
1176 activity.data,
1177 activity.data,
1178 ^ids
1179 )
1180 )
1181 end
1182
1183 defp restrict_pinned(query, _), do: query
1184
1185 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1186 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1187
1188 from(
1189 activity in query,
1190 where:
1191 fragment(
1192 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1193 activity.data,
1194 activity.actor,
1195 ^muted_reblogs
1196 )
1197 )
1198 end
1199
1200 defp restrict_muted_reblogs(query, _), do: query
1201
1202 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1203 from(
1204 activity in query,
1205 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1206 )
1207 end
1208
1209 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1210 from(
1211 activity in query,
1212 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1213 )
1214 end
1215
1216 defp restrict_instance(query, _), do: query
1217
1218 defp restrict_filtered(query, %{user: %User{} = user}) do
1219 case Filter.compose_regex(user) do
1220 nil ->
1221 query
1222
1223 regex ->
1224 from([activity, object] in query,
1225 where:
1226 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1227 activity.actor == ^user.ap_id
1228 )
1229 end
1230 end
1231
1232 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1233 restrict_filtered(query, %{user: user})
1234 end
1235
1236 defp restrict_filtered(query, _), do: query
1237
1238 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1239
1240 defp exclude_poll_votes(query, _) do
1241 if has_named_binding?(query, :object) do
1242 from([activity, object: o] in query,
1243 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1244 )
1245 else
1246 query
1247 end
1248 end
1249
1250 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1251 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1252
1253 defp exclude_invisible_actors(query, _opts) do
1254 query
1255 |> join(:inner, [activity], u in User,
1256 as: :u,
1257 on: activity.actor == u.ap_id and u.invisible == false
1258 )
1259 end
1260
1261 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1262 from(activity in query, where: activity.id != ^id)
1263 end
1264
1265 defp exclude_id(query, _), do: query
1266
1267 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1268
1269 defp maybe_preload_objects(query, _) do
1270 query
1271 |> Activity.with_preloaded_object()
1272 end
1273
1274 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1275
1276 defp maybe_preload_bookmarks(query, opts) do
1277 query
1278 |> Activity.with_preloaded_bookmark(opts[:user])
1279 end
1280
1281 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1282 query
1283 |> Activity.with_preloaded_report_notes()
1284 end
1285
1286 defp maybe_preload_report_notes(query, _), do: query
1287
1288 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1289
1290 defp maybe_set_thread_muted_field(query, opts) do
1291 query
1292 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1293 end
1294
1295 defp maybe_order(query, %{order: :desc}) do
1296 query
1297 |> order_by(desc: :id)
1298 end
1299
1300 defp maybe_order(query, %{order: :asc}) do
1301 query
1302 |> order_by(asc: :id)
1303 end
1304
1305 defp maybe_order(query, _), do: query
1306
1307 defp normalize_fetch_activities_query_opts(opts) do
1308 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1309 case opts[key] do
1310 value when is_bitstring(value) ->
1311 Map.put(opts, key, Hashtag.normalize_name(value))
1312
1313 value when is_list(value) ->
1314 normalized_value =
1315 value
1316 |> Enum.map(&Hashtag.normalize_name/1)
1317 |> Enum.uniq()
1318
1319 Map.put(opts, key, normalized_value)
1320
1321 _ ->
1322 opts
1323 end
1324 end)
1325 end
1326
1327 defp fetch_activities_query_ap_ids_ops(opts) do
1328 source_user = opts[:muting_user]
1329 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1330
1331 ap_id_relationships =
1332 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1333 [:block | ap_id_relationships]
1334 else
1335 ap_id_relationships
1336 end
1337
1338 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1339
1340 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1341 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1342
1343 restrict_muted_reblogs_opts =
1344 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1345
1346 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1347 end
1348
1349 def fetch_activities_query(recipients, opts \\ %{}) do
1350 opts = normalize_fetch_activities_query_opts(opts)
1351
1352 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1353 fetch_activities_query_ap_ids_ops(opts)
1354
1355 config = %{
1356 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1357 }
1358
1359 query =
1360 Activity
1361 |> maybe_preload_objects(opts)
1362 |> maybe_preload_bookmarks(opts)
1363 |> maybe_preload_report_notes(opts)
1364 |> maybe_set_thread_muted_field(opts)
1365 |> maybe_order(opts)
1366 |> restrict_recipients(recipients, opts[:user])
1367 |> restrict_replies(opts)
1368 |> restrict_since(opts)
1369 |> restrict_local(opts)
1370 |> restrict_remote(opts)
1371 |> restrict_actor(opts)
1372 |> restrict_type(opts)
1373 |> restrict_state(opts)
1374 |> restrict_favorited_by(opts)
1375 |> restrict_blocked(restrict_blocked_opts)
1376 |> restrict_blockers_visibility(opts)
1377 |> restrict_muted(restrict_muted_opts)
1378 |> restrict_filtered(opts)
1379 |> restrict_media(opts)
1380 |> restrict_visibility(opts)
1381 |> restrict_thread_visibility(opts, config)
1382 |> restrict_reblogs(opts)
1383 |> restrict_pinned(opts)
1384 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1385 |> restrict_instance(opts)
1386 |> restrict_announce_object_actor(opts)
1387 |> restrict_filtered(opts)
1388 |> maybe_restrict_deactivated_users(opts)
1389 |> exclude_poll_votes(opts)
1390 |> exclude_invisible_actors(opts)
1391 |> exclude_visibility(opts)
1392
1393 if Config.feature_enabled?(:improved_hashtag_timeline) do
1394 query
1395 |> restrict_hashtag_any(opts)
1396 |> restrict_hashtag_all(opts)
1397 |> restrict_hashtag_reject_any(opts)
1398 else
1399 query
1400 |> restrict_embedded_tag_any(opts)
1401 |> restrict_embedded_tag_all(opts)
1402 |> restrict_embedded_tag_reject_any(opts)
1403 end
1404 end
1405
1406 @doc """
1407 Fetch favorites activities of user with order by sort adds to favorites
1408 """
1409 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1410 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1411 user.ap_id
1412 |> Activity.Queries.by_actor()
1413 |> Activity.Queries.by_type("Like")
1414 |> Activity.with_joined_object()
1415 |> Object.with_joined_activity()
1416 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1417 |> order_by([like, _, _], desc_nulls_last: like.id)
1418 |> Pagination.fetch_paginated(
1419 Map.merge(params, %{skip_order: true}),
1420 pagination
1421 )
1422 end
1423
1424 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1425 Enum.map(activities, fn
1426 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1427 if Enum.any?(bcc, &(&1 in list_memberships)) do
1428 update_in(activity.data["cc"], &[user_ap_id | &1])
1429 else
1430 activity
1431 end
1432
1433 activity ->
1434 activity
1435 end)
1436 end
1437
1438 defp maybe_update_cc(activities, _, _), do: activities
1439
1440 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1441 from(activity in query,
1442 where:
1443 fragment("? && ?", activity.recipients, ^recipients) or
1444 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1445 ^Constants.as_public() in activity.recipients)
1446 )
1447 end
1448
1449 def fetch_activities_bounded(
1450 recipients,
1451 recipients_with_public,
1452 opts \\ %{},
1453 pagination \\ :keyset
1454 ) do
1455 fetch_activities_query([], opts)
1456 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1457 |> Pagination.fetch_paginated(opts, pagination)
1458 |> Enum.reverse()
1459 end
1460
1461 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1462 def upload(file, opts \\ []) do
1463 with {:ok, data} <- Upload.store(file, opts) do
1464 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1465
1466 Repo.insert(%Object{data: obj_data})
1467 end
1468 end
1469
1470 @spec get_actor_url(any()) :: binary() | nil
1471 defp get_actor_url(url) when is_binary(url), do: url
1472 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1473
1474 defp get_actor_url(url) when is_list(url) do
1475 url
1476 |> List.first()
1477 |> get_actor_url()
1478 end
1479
1480 defp get_actor_url(_url), do: nil
1481
1482 defp normalize_image(%{"url" => url}) do
1483 %{
1484 "type" => "Image",
1485 "url" => [%{"href" => url}]
1486 }
1487 end
1488
1489 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1490 defp normalize_image(_), do: nil
1491
1492 defp object_to_user_data(data, additional) do
1493 fields =
1494 data
1495 |> Map.get("attachment", [])
1496 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1497 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1498
1499 emojis =
1500 data
1501 |> Map.get("tag", [])
1502 |> Enum.filter(fn
1503 %{"type" => "Emoji"} -> true
1504 _ -> false
1505 end)
1506 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1507 {String.trim(name, ":"), url}
1508 end)
1509
1510 is_locked = data["manuallyApprovesFollowers"] || false
1511 data = Transmogrifier.maybe_fix_user_object(data)
1512 is_discoverable = data["discoverable"] || false
1513 invisible = data["invisible"] || false
1514 actor_type = data["type"] || "Person"
1515
1516 featured_address = data["featured"]
1517 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1518
1519 public_key =
1520 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1521 data["publicKey"]["publicKeyPem"]
1522 end
1523
1524 shared_inbox =
1525 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1526 data["endpoints"]["sharedInbox"]
1527 end
1528
1529 # if WebFinger request was already done, we probably have acct, otherwise
1530 # we request WebFinger here
1531 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1532
1533 # also_known_as must be a URL
1534 also_known_as =
1535 data
1536 |> Map.get("alsoKnownAs", [])
1537 |> Enum.filter(fn url ->
1538 case URI.parse(url) do
1539 %URI{scheme: "http"} -> true
1540 %URI{scheme: "https"} -> true
1541 _ -> false
1542 end
1543 end)
1544
1545 %{
1546 ap_id: data["id"],
1547 uri: get_actor_url(data["url"]),
1548 ap_enabled: true,
1549 banner: normalize_image(data["image"]),
1550 fields: fields,
1551 emoji: emojis,
1552 is_locked: is_locked,
1553 is_discoverable: is_discoverable,
1554 invisible: invisible,
1555 avatar: normalize_image(data["icon"]),
1556 name: data["name"],
1557 follower_address: data["followers"],
1558 following_address: data["following"],
1559 featured_address: featured_address,
1560 bio: data["summary"] || "",
1561 actor_type: actor_type,
1562 also_known_as: also_known_as,
1563 public_key: public_key,
1564 inbox: data["inbox"],
1565 shared_inbox: shared_inbox,
1566 pinned_objects: pinned_objects,
1567 nickname: nickname
1568 }
1569 end
1570
1571 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1572 generated = "#{username}@#{URI.parse(data["id"]).host}"
1573
1574 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1575 case WebFinger.finger(generated) do
1576 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1577 _ -> generated
1578 end
1579 else
1580 generated
1581 end
1582 end
1583
1584 # nickname can be nil because of virtual actors
1585 defp generate_nickname(_), do: nil
1586
1587 def fetch_follow_information_for_user(user) do
1588 with {:ok, following_data} <-
1589 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1590 {:ok, hide_follows} <- collection_private(following_data),
1591 {:ok, followers_data} <-
1592 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1593 {:ok, hide_followers} <- collection_private(followers_data) do
1594 {:ok,
1595 %{
1596 hide_follows: hide_follows,
1597 follower_count: normalize_counter(followers_data["totalItems"]),
1598 following_count: normalize_counter(following_data["totalItems"]),
1599 hide_followers: hide_followers
1600 }}
1601 else
1602 {:error, _} = e -> e
1603 e -> {:error, e}
1604 end
1605 end
1606
1607 defp normalize_counter(counter) when is_integer(counter), do: counter
1608 defp normalize_counter(_), do: 0
1609
1610 def maybe_update_follow_information(user_data) do
1611 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1612 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1613 {_, true} <-
1614 {:collections_available,
1615 !!(user_data[:following_address] && user_data[:follower_address])},
1616 {:ok, info} <-
1617 fetch_follow_information_for_user(user_data) do
1618 info = Map.merge(user_data[:info] || %{}, info)
1619
1620 user_data
1621 |> Map.put(:info, info)
1622 else
1623 {:user_type_check, false} ->
1624 user_data
1625
1626 {:collections_available, false} ->
1627 user_data
1628
1629 {:enabled, false} ->
1630 user_data
1631
1632 e ->
1633 Logger.error(
1634 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1635 )
1636
1637 user_data
1638 end
1639 end
1640
1641 defp collection_private(%{"first" => %{"type" => type}})
1642 when type in ["CollectionPage", "OrderedCollectionPage"],
1643 do: {:ok, false}
1644
1645 defp collection_private(%{"first" => first}) do
1646 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1647 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1648 {:ok, false}
1649 else
1650 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1651 {:error, _} = e -> e
1652 e -> {:error, e}
1653 end
1654 end
1655
1656 defp collection_private(_data), do: {:ok, true}
1657
1658 def user_data_from_user_object(data, additional \\ []) do
1659 with {:ok, data} <- MRF.filter(data) do
1660 {:ok, object_to_user_data(data, additional)}
1661 else
1662 e -> {:error, e}
1663 end
1664 end
1665
1666 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1667 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1668 {:ok, data} <- user_data_from_user_object(data, additional) do
1669 {:ok, maybe_update_follow_information(data)}
1670 else
1671 # If this has been deleted, only log a debug and not an error
1672 {:error, "Object has been deleted" = e} ->
1673 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1674 {:error, e}
1675
1676 {:error, {:reject, reason} = e} ->
1677 Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
1678 {:error, e}
1679
1680 {:error, e} ->
1681 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1682 {:error, e}
1683 end
1684 end
1685
1686 def maybe_handle_clashing_nickname(data) do
1687 with nickname when is_binary(nickname) <- data[:nickname],
1688 %User{} = old_user <- User.get_by_nickname(nickname),
1689 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1690 Logger.info(
1691 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1692 )
1693
1694 old_user
1695 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1696 |> User.update_and_set_cache()
1697 else
1698 {:ap_id_comparison, true} ->
1699 Logger.info(
1700 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1701 )
1702
1703 _ ->
1704 nil
1705 end
1706 end
1707
1708 def pin_data_from_featured_collection(%{
1709 "type" => "OrderedCollection",
1710 "first" => first
1711 }) do
1712 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1713 page
1714 |> Map.get("orderedItems")
1715 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1716 else
1717 e ->
1718 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1719 {:ok, %{}}
1720 end
1721 end
1722
1723 def pin_data_from_featured_collection(
1724 %{
1725 "type" => type
1726 } = collection
1727 )
1728 when type in ["OrderedCollection", "Collection"] do
1729 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1730
1731 # Items can either be a map _or_ a string
1732 objects
1733 |> Map.new(fn
1734 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1735 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1736 end)
1737 end
1738
1739 def fetch_and_prepare_featured_from_ap_id(nil) do
1740 {:ok, %{}}
1741 end
1742
1743 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1744 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1745 {:ok, pin_data_from_featured_collection(data)}
1746 else
1747 e ->
1748 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1749 {:ok, %{}}
1750 end
1751 end
1752
1753 def pinned_fetch_task(nil), do: nil
1754
1755 def pinned_fetch_task(%{pinned_objects: pins}) do
1756 if Enum.all?(pins, fn {ap_id, _} ->
1757 Object.get_cached_by_ap_id(ap_id) ||
1758 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1759 end) do
1760 :ok
1761 else
1762 :error
1763 end
1764 end
1765
1766 def make_user_from_ap_id(ap_id, additional \\ []) do
1767 user = User.get_cached_by_ap_id(ap_id)
1768
1769 if user && !User.ap_enabled?(user) do
1770 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1771 else
1772 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1773 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1774
1775 if user do
1776 user
1777 |> User.remote_user_changeset(data)
1778 |> User.update_and_set_cache()
1779 else
1780 maybe_handle_clashing_nickname(data)
1781
1782 data
1783 |> User.remote_user_changeset()
1784 |> Repo.insert()
1785 |> User.set_cache()
1786 end
1787 end
1788 end
1789 end
1790
1791 def make_user_from_nickname(nickname) do
1792 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1793 WebFinger.finger(nickname) do
1794 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1795 else
1796 _e -> {:error, "No AP id in WebFinger"}
1797 end
1798 end
1799
1800 # filter out broken threads
1801 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1802 entire_thread_visible_for_user?(activity, user)
1803 end
1804
1805 # do post-processing on a specific activity
1806 def contain_activity(%Activity{} = activity, %User{} = user) do
1807 contain_broken_threads(activity, user)
1808 end
1809
1810 def fetch_direct_messages_query do
1811 Activity
1812 |> restrict_type(%{type: "Create"})
1813 |> restrict_visibility(%{visibility: "direct"})
1814 |> order_by([activity], asc: activity.id)
1815 end
1816
1817 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1818
1819 defp maybe_restrict_deactivated_users(activity, _opts),
1820 do: Activity.restrict_deactivated_users(activity)
1821 end