Don't persist local undone follow (#194)
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local)
331
332 defp do_unfollow(follower, followed, activity_id, local) when local == true do
333 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 {:ok, _activity} <- Repo.delete(follow_activity),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity}
340 else
341 nil -> nil
342 {:error, error} -> Repo.rollback(error)
343 end
344 end
345
346 defp do_unfollow(follower, followed, activity_id, false) do
347 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
348 # uses deterministic ids for follows.
349 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
350 {:ok, _activity} <- Repo.delete(follow_activity),
351 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
352 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
353 _ <- notify_and_stream(unfollow_activity) do
354 {:ok, unfollow_activity}
355 else
356 nil -> nil
357 {:error, error} -> Repo.rollback(error)
358 end
359 end
360
361 defp make_unfollow_activity(data, local) do
362 {recipients, _, _} = get_recipients(data)
363
364 %Activity{
365 data: data,
366 local: local,
367 actor: data["actor"],
368 recipients: recipients
369 }
370 end
371
372 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
373 def flag(params) do
374 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
375 result
376 end
377 end
378
379 defp do_flag(
380 %{
381 actor: actor,
382 context: _context,
383 account: account,
384 statuses: statuses,
385 content: content
386 } = params
387 ) do
388 # only accept false as false value
389 local = !(params[:local] == false)
390 forward = !(params[:forward] == false)
391
392 additional = params[:additional] || %{}
393
394 additional =
395 if forward do
396 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
397 else
398 Map.merge(additional, %{"to" => [], "cc" => []})
399 end
400
401 with flag_data <- make_flag_data(params, additional),
402 {:ok, activity} <- insert(flag_data, local),
403 {:ok, stripped_activity} <- strip_report_status_data(activity),
404 _ <- notify_and_stream(activity),
405 :ok <-
406 maybe_federate(stripped_activity) do
407 User.all_superusers()
408 |> Enum.filter(fn user -> user.ap_id != actor end)
409 |> Enum.filter(fn user -> not is_nil(user.email) end)
410 |> Enum.each(fn superuser ->
411 superuser
412 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
413 |> Pleroma.Emails.Mailer.deliver_async()
414 end)
415
416 {:ok, activity}
417 else
418 {:error, error} -> Repo.rollback(error)
419 end
420 end
421
422 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
423 def move(%User{} = origin, %User{} = target, local \\ true) do
424 params = %{
425 "type" => "Move",
426 "actor" => origin.ap_id,
427 "object" => origin.ap_id,
428 "target" => target.ap_id,
429 "to" => [origin.follower_address]
430 }
431
432 with true <- origin.ap_id in target.also_known_as,
433 {:ok, activity} <- insert(params, local),
434 _ <- notify_and_stream(activity) do
435 maybe_federate(activity)
436
437 BackgroundWorker.enqueue("move_following", %{
438 "origin_id" => origin.id,
439 "target_id" => target.id
440 })
441
442 {:ok, activity}
443 else
444 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
445 err -> err
446 end
447 end
448
449 def fetch_activities_for_context_query(context, opts) do
450 public = [Constants.as_public()]
451
452 recipients =
453 if opts[:user],
454 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
455 else: public
456
457 from(activity in Activity)
458 |> maybe_preload_objects(opts)
459 |> maybe_preload_bookmarks(opts)
460 |> maybe_set_thread_muted_field(opts)
461 |> restrict_blocked(opts)
462 |> restrict_blockers_visibility(opts)
463 |> restrict_recipients(recipients, opts[:user])
464 |> restrict_filtered(opts)
465 |> where(
466 [activity],
467 fragment(
468 "?->>'type' = ? and ?->>'context' = ?",
469 activity.data,
470 "Create",
471 activity.data,
472 ^context
473 )
474 )
475 |> exclude_poll_votes(opts)
476 |> exclude_id(opts)
477 |> order_by([activity], desc: activity.id)
478 end
479
480 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
481 def fetch_activities_for_context(context, opts \\ %{}) do
482 context
483 |> fetch_activities_for_context_query(opts)
484 |> Repo.all()
485 end
486
487 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
488 FlakeId.Ecto.CompatType.t() | nil
489 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
490 context
491 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
492 |> restrict_visibility(%{visibility: "direct"})
493 |> limit(1)
494 |> select([a], a.id)
495 |> Repo.one()
496 end
497
498 defp fetch_paginated_optimized(query, opts, pagination) do
499 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
500 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
501 opts = Map.put(opts, :skip_extra_order, true)
502
503 Pagination.fetch_paginated(query, opts, pagination)
504 end
505
506 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
507 list_memberships = Pleroma.List.memberships(opts[:user])
508
509 fetch_activities_query(recipients ++ list_memberships, opts)
510 |> fetch_paginated_optimized(opts, pagination)
511 |> Enum.reverse()
512 |> maybe_update_cc(list_memberships, opts[:user])
513 end
514
515 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
516 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
517 includes_local_public = Map.get(opts, :includes_local_public, false)
518
519 opts = Map.delete(opts, :user)
520
521 intended_recipients =
522 if includes_local_public do
523 [Constants.as_public(), as_local_public()]
524 else
525 [Constants.as_public()]
526 end
527
528 intended_recipients
529 |> fetch_activities_query(opts)
530 |> restrict_unlisted(opts)
531 |> fetch_paginated_optimized(opts, pagination)
532 end
533
534 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
535 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
536 opts
537 |> Map.put(:restrict_unlisted, true)
538 |> fetch_public_or_unlisted_activities(pagination)
539 end
540
541 @valid_visibilities ~w[direct unlisted public private]
542
543 defp restrict_visibility(query, %{visibility: visibility})
544 when is_list(visibility) do
545 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
546 from(
547 a in query,
548 where:
549 fragment(
550 "activity_visibility(?, ?, ?) = ANY (?)",
551 a.actor,
552 a.recipients,
553 a.data,
554 ^visibility
555 )
556 )
557 else
558 Logger.error("Could not restrict visibility to #{visibility}")
559 end
560 end
561
562 defp restrict_visibility(query, %{visibility: visibility})
563 when visibility in @valid_visibilities do
564 from(
565 a in query,
566 where:
567 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
568 )
569 end
570
571 defp restrict_visibility(_query, %{visibility: visibility})
572 when visibility not in @valid_visibilities do
573 Logger.error("Could not restrict visibility to #{visibility}")
574 end
575
576 defp restrict_visibility(query, _visibility), do: query
577
578 defp exclude_visibility(query, %{exclude_visibilities: visibility})
579 when is_list(visibility) do
580 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
581 from(
582 a in query,
583 where:
584 not fragment(
585 "activity_visibility(?, ?, ?) = ANY (?)",
586 a.actor,
587 a.recipients,
588 a.data,
589 ^visibility
590 )
591 )
592 else
593 Logger.error("Could not exclude visibility to #{visibility}")
594 query
595 end
596 end
597
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility in @valid_visibilities do
600 from(
601 a in query,
602 where:
603 not fragment(
604 "activity_visibility(?, ?, ?) = ?",
605 a.actor,
606 a.recipients,
607 a.data,
608 ^visibility
609 )
610 )
611 end
612
613 defp exclude_visibility(query, %{exclude_visibilities: visibility})
614 when visibility not in [nil | @valid_visibilities] do
615 Logger.error("Could not exclude visibility to #{visibility}")
616 query
617 end
618
619 defp exclude_visibility(query, _visibility), do: query
620
621 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
622 do: query
623
624 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
625 do: query
626
627 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
628 local_public = as_local_public()
629
630 from(
631 a in query,
632 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
633 )
634 end
635
636 defp restrict_thread_visibility(query, _, _), do: query
637
638 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
639 params =
640 params
641 |> Map.put(:user, reading_user)
642 |> Map.put(:actor_id, user.ap_id)
643
644 %{
645 godmode: params[:godmode],
646 reading_user: reading_user
647 }
648 |> user_activities_recipients()
649 |> fetch_activities(params)
650 |> Enum.reverse()
651 end
652
653 def fetch_user_activities(user, reading_user, params \\ %{})
654
655 def fetch_user_activities(user, reading_user, %{total: true} = params) do
656 result = fetch_activities_for_user(user, reading_user, params)
657
658 Keyword.put(result, :items, Enum.reverse(result[:items]))
659 end
660
661 def fetch_user_activities(user, reading_user, params) do
662 user
663 |> fetch_activities_for_user(reading_user, params)
664 |> Enum.reverse()
665 end
666
667 defp fetch_activities_for_user(user, reading_user, params) do
668 params =
669 params
670 |> Map.put(:type, ["Create", "Announce"])
671 |> Map.put(:user, reading_user)
672 |> Map.put(:actor_id, user.ap_id)
673 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
674
675 params =
676 if User.blocks?(reading_user, user) do
677 params
678 else
679 params
680 |> Map.put(:blocking_user, reading_user)
681 |> Map.put(:muting_user, reading_user)
682 end
683
684 pagination_type = Map.get(params, :pagination_type) || :keyset
685
686 %{
687 godmode: params[:godmode],
688 reading_user: reading_user
689 }
690 |> user_activities_recipients()
691 |> fetch_activities(params, pagination_type)
692 end
693
694 def fetch_statuses(reading_user, %{total: true} = params) do
695 result = fetch_activities_for_reading_user(reading_user, params)
696 Keyword.put(result, :items, Enum.reverse(result[:items]))
697 end
698
699 def fetch_statuses(reading_user, params) do
700 reading_user
701 |> fetch_activities_for_reading_user(params)
702 |> Enum.reverse()
703 end
704
705 defp fetch_activities_for_reading_user(reading_user, params) do
706 params = Map.put(params, :type, ["Create", "Announce"])
707
708 %{
709 godmode: params[:godmode],
710 reading_user: reading_user
711 }
712 |> user_activities_recipients()
713 |> fetch_activities(params, :offset)
714 end
715
716 defp user_activities_recipients(%{godmode: true}), do: []
717
718 defp user_activities_recipients(%{reading_user: reading_user}) do
719 if not is_nil(reading_user) and reading_user.local do
720 [
721 Constants.as_public(),
722 as_local_public(),
723 reading_user.ap_id | User.following(reading_user)
724 ]
725 else
726 [Constants.as_public()]
727 end
728 end
729
730 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
731 raise "Can't use the child object without preloading!"
732 end
733
734 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
735 from(
736 [activity, object] in query,
737 where:
738 fragment(
739 "?->>'type' != ? or ?->>'actor' != ?",
740 activity.data,
741 "Announce",
742 object.data,
743 ^actor
744 )
745 )
746 end
747
748 defp restrict_announce_object_actor(query, _), do: query
749
750 defp restrict_since(query, %{since_id: ""}), do: query
751
752 defp restrict_since(query, %{since_id: since_id}) do
753 from(activity in query, where: activity.id > ^since_id)
754 end
755
756 defp restrict_since(query, _), do: query
757
758 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
759 raise_on_missing_preload()
760 end
761
762 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
763 from(
764 [_activity, object] in query,
765 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
766 )
767 end
768
769 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
770 restrict_embedded_tag_any(query, %{tag: tag})
771 end
772
773 defp restrict_embedded_tag_all(query, _), do: query
774
775 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
780 from(
781 [_activity, object] in query,
782 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
783 )
784 end
785
786 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
787 restrict_embedded_tag_any(query, %{tag: [tag]})
788 end
789
790 defp restrict_embedded_tag_any(query, _), do: query
791
792 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
793 raise_on_missing_preload()
794 end
795
796 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
797 from(
798 [_activity, object] in query,
799 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
800 )
801 end
802
803 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
804 when is_binary(tag_reject) do
805 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
806 end
807
808 defp restrict_embedded_tag_reject_any(query, _), do: query
809
810 defp object_ids_query_for_tags(tags) do
811 from(hto in "hashtags_objects")
812 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
813 |> where([hto, ht], ht.name in ^tags)
814 |> select([hto], hto.object_id)
815 |> distinct([hto], true)
816 end
817
818 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
819 raise_on_missing_preload()
820 end
821
822 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
823 restrict_hashtag_any(query, %{tag: single_tag})
824 end
825
826 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
827 from(
828 [_activity, object] in query,
829 where:
830 fragment(
831 """
832 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
833 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
834 AND hashtags_objects.object_id = ?) @> ?
835 """,
836 ^tags,
837 object.id,
838 ^tags
839 )
840 )
841 end
842
843 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
844 restrict_hashtag_all(query, %{tag_all: [tag]})
845 end
846
847 defp restrict_hashtag_all(query, _), do: query
848
849 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
850 raise_on_missing_preload()
851 end
852
853 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
854 hashtag_ids =
855 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
856 |> Repo.all()
857
858 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
859 from(
860 [_activity, object] in query,
861 join: hto in "hashtags_objects",
862 on: hto.object_id == object.id,
863 where: hto.hashtag_id in ^hashtag_ids,
864 distinct: [desc: object.id],
865 order_by: [desc: object.id]
866 )
867 end
868
869 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
870 restrict_hashtag_any(query, %{tag: [tag]})
871 end
872
873 defp restrict_hashtag_any(query, _), do: query
874
875 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
876 raise_on_missing_preload()
877 end
878
879 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
880 from(
881 [_activity, object] in query,
882 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
883 )
884 end
885
886 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
887 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
888 end
889
890 defp restrict_hashtag_reject_any(query, _), do: query
891
892 defp raise_on_missing_preload do
893 raise "Can't use the child object without preloading!"
894 end
895
896 defp restrict_recipients(query, [], _user), do: query
897
898 defp restrict_recipients(query, recipients, nil) do
899 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
900 end
901
902 defp restrict_recipients(query, recipients, user) do
903 from(
904 activity in query,
905 where: fragment("? && ?", ^recipients, activity.recipients),
906 or_where: activity.actor == ^user.ap_id
907 )
908 end
909
910 defp restrict_local(query, %{local_only: true}) do
911 from(activity in query, where: activity.local == true)
912 end
913
914 defp restrict_local(query, _), do: query
915
916 defp restrict_remote(query, %{remote: true}) do
917 from(activity in query, where: activity.local == false)
918 end
919
920 defp restrict_remote(query, _), do: query
921
922 defp restrict_actor(query, %{actor_id: actor_id}) do
923 from(activity in query, where: activity.actor == ^actor_id)
924 end
925
926 defp restrict_actor(query, _), do: query
927
928 defp restrict_type(query, %{type: type}) when is_binary(type) do
929 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
930 end
931
932 defp restrict_type(query, %{type: type}) do
933 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
934 end
935
936 defp restrict_type(query, _), do: query
937
938 defp restrict_state(query, %{state: state}) do
939 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
940 end
941
942 defp restrict_state(query, _), do: query
943
944 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
945 from(
946 [_activity, object] in query,
947 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
948 )
949 end
950
951 defp restrict_favorited_by(query, _), do: query
952
953 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
954 raise "Can't use the child object without preloading!"
955 end
956
957 defp restrict_media(query, %{only_media: true}) do
958 from(
959 [activity, object] in query,
960 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
961 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
962 )
963 end
964
965 defp restrict_media(query, _), do: query
966
967 defp restrict_replies(query, %{exclude_replies: true}) do
968 from(
969 [_activity, object] in query,
970 where: fragment("?->>'inReplyTo' is null", object.data)
971 )
972 end
973
974 defp restrict_replies(query, %{
975 reply_filtering_user: %User{} = user,
976 reply_visibility: "self"
977 }) do
978 from(
979 [activity, object] in query,
980 where:
981 fragment(
982 "?->>'inReplyTo' is null OR ? = ANY(?)",
983 object.data,
984 ^user.ap_id,
985 activity.recipients
986 )
987 )
988 end
989
990 defp restrict_replies(query, %{
991 reply_filtering_user: %User{} = user,
992 reply_visibility: "following"
993 }) do
994 from(
995 [activity, object] in query,
996 where:
997 fragment(
998 """
999 ?->>'type' != 'Create' -- This isn't a Create
1000 OR ?->>'inReplyTo' is null -- this isn't a reply
1001 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1002 -- unless they are the author (because authors
1003 -- are also part of the recipients). This leads
1004 -- to a bug that self-replies by friends won't
1005 -- show up.
1006 OR ? = ? -- The actor is us
1007 """,
1008 activity.data,
1009 object.data,
1010 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1011 activity.recipients,
1012 activity.actor,
1013 activity.actor,
1014 ^user.ap_id
1015 )
1016 )
1017 end
1018
1019 defp restrict_replies(query, _), do: query
1020
1021 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1022 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1023 end
1024
1025 defp restrict_reblogs(query, _), do: query
1026
1027 defp restrict_muted(query, %{with_muted: true}), do: query
1028
1029 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1030 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1031
1032 query =
1033 from([activity] in query,
1034 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1035 where:
1036 fragment(
1037 "not (?->'to' \\?| ?) or ? = ?",
1038 activity.data,
1039 ^mutes,
1040 activity.actor,
1041 ^user.ap_id
1042 )
1043 )
1044
1045 unless opts[:skip_preload] do
1046 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1047 else
1048 query
1049 end
1050 end
1051
1052 defp restrict_muted(query, _), do: query
1053
1054 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1055 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1056 domain_blocks = user.domain_blocks || []
1057
1058 following_ap_ids = User.get_friends_ap_ids(user)
1059
1060 query =
1061 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1062
1063 from(
1064 [activity, object: o] in query,
1065 # You don't block the author
1066 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1067
1068 # You don't block any recipients, and didn't author the post
1069 where:
1070 fragment(
1071 "((not (? && ?)) or ? = ?)",
1072 activity.recipients,
1073 ^blocked_ap_ids,
1074 activity.actor,
1075 ^user.ap_id
1076 ),
1077
1078 # You don't block the domain of any recipients, and didn't author the post
1079 where:
1080 fragment(
1081 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1082 activity.recipients,
1083 ^domain_blocks,
1084 activity.actor,
1085 ^user.ap_id
1086 ),
1087
1088 # It's not a boost of a user you block
1089 where:
1090 fragment(
1091 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1092 activity.data,
1093 activity.data,
1094 ^blocked_ap_ids
1095 ),
1096
1097 # You don't block the author's domain, and also don't follow the author
1098 where:
1099 fragment(
1100 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1101 activity.actor,
1102 ^domain_blocks,
1103 activity.actor,
1104 ^following_ap_ids
1105 ),
1106
1107 # Same as above, but checks the Object
1108 where:
1109 fragment(
1110 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1111 o.data,
1112 ^domain_blocks,
1113 o.data,
1114 ^following_ap_ids
1115 )
1116 )
1117 end
1118
1119 defp restrict_blocked(query, _), do: query
1120
1121 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1122 if Config.get([:activitypub, :blockers_visible]) == true do
1123 query
1124 else
1125 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1126
1127 from(
1128 activity in query,
1129 # The author doesn't block you
1130 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1131
1132 # It's not a boost of a user that blocks you
1133 where:
1134 fragment(
1135 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1136 activity.data,
1137 activity.data,
1138 ^blocker_ap_ids
1139 )
1140 )
1141 end
1142 end
1143
1144 defp restrict_blockers_visibility(query, _), do: query
1145
1146 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1147 from(
1148 activity in query,
1149 where:
1150 fragment(
1151 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1152 activity.data,
1153 ^[Constants.as_public()]
1154 )
1155 )
1156 end
1157
1158 defp restrict_unlisted(query, _), do: query
1159
1160 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1161 from(
1162 [activity, object: o] in query,
1163 where:
1164 fragment(
1165 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1166 activity.data,
1167 activity.data,
1168 activity.data,
1169 ^ids
1170 )
1171 )
1172 end
1173
1174 defp restrict_pinned(query, _), do: query
1175
1176 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1177 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1178
1179 from(
1180 activity in query,
1181 where:
1182 fragment(
1183 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1184 activity.data,
1185 activity.actor,
1186 ^muted_reblogs
1187 )
1188 )
1189 end
1190
1191 defp restrict_muted_reblogs(query, _), do: query
1192
1193 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1194 from(
1195 activity in query,
1196 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1197 )
1198 end
1199
1200 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1201 from(
1202 activity in query,
1203 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1204 )
1205 end
1206
1207 defp restrict_instance(query, _), do: query
1208
1209 defp restrict_filtered(query, %{user: %User{} = user}) do
1210 case Filter.compose_regex(user) do
1211 nil ->
1212 query
1213
1214 regex ->
1215 from([activity, object] in query,
1216 where:
1217 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1218 activity.actor == ^user.ap_id
1219 )
1220 end
1221 end
1222
1223 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1224 restrict_filtered(query, %{user: user})
1225 end
1226
1227 defp restrict_filtered(query, _), do: query
1228
1229 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1230
1231 defp exclude_poll_votes(query, _) do
1232 if has_named_binding?(query, :object) do
1233 from([activity, object: o] in query,
1234 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1235 )
1236 else
1237 query
1238 end
1239 end
1240
1241 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1242
1243 defp exclude_invisible_actors(query, _opts) do
1244 invisible_ap_ids =
1245 User.Query.build(%{invisible: true, select: [:ap_id]})
1246 |> Repo.all()
1247 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1248
1249 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1250 end
1251
1252 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1253 from(activity in query, where: activity.id != ^id)
1254 end
1255
1256 defp exclude_id(query, _), do: query
1257
1258 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1259
1260 defp maybe_preload_objects(query, _) do
1261 query
1262 |> Activity.with_preloaded_object()
1263 end
1264
1265 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1266
1267 defp maybe_preload_bookmarks(query, opts) do
1268 query
1269 |> Activity.with_preloaded_bookmark(opts[:user])
1270 end
1271
1272 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1273 query
1274 |> Activity.with_preloaded_report_notes()
1275 end
1276
1277 defp maybe_preload_report_notes(query, _), do: query
1278
1279 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1280
1281 defp maybe_set_thread_muted_field(query, opts) do
1282 query
1283 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1284 end
1285
1286 defp maybe_order(query, %{order: :desc}) do
1287 query
1288 |> order_by(desc: :id)
1289 end
1290
1291 defp maybe_order(query, %{order: :asc}) do
1292 query
1293 |> order_by(asc: :id)
1294 end
1295
1296 defp maybe_order(query, _), do: query
1297
1298 defp normalize_fetch_activities_query_opts(opts) do
1299 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1300 case opts[key] do
1301 value when is_bitstring(value) ->
1302 Map.put(opts, key, Hashtag.normalize_name(value))
1303
1304 value when is_list(value) ->
1305 normalized_value =
1306 value
1307 |> Enum.map(&Hashtag.normalize_name/1)
1308 |> Enum.uniq()
1309
1310 Map.put(opts, key, normalized_value)
1311
1312 _ ->
1313 opts
1314 end
1315 end)
1316 end
1317
1318 defp fetch_activities_query_ap_ids_ops(opts) do
1319 source_user = opts[:muting_user]
1320 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1321
1322 ap_id_relationships =
1323 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1324 [:block | ap_id_relationships]
1325 else
1326 ap_id_relationships
1327 end
1328
1329 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1330
1331 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1332 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1333
1334 restrict_muted_reblogs_opts =
1335 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1336
1337 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1338 end
1339
1340 def fetch_activities_query(recipients, opts \\ %{}) do
1341 opts = normalize_fetch_activities_query_opts(opts)
1342
1343 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1344 fetch_activities_query_ap_ids_ops(opts)
1345
1346 config = %{
1347 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1348 }
1349
1350 query =
1351 Activity
1352 |> maybe_preload_objects(opts)
1353 |> maybe_preload_bookmarks(opts)
1354 |> maybe_preload_report_notes(opts)
1355 |> maybe_set_thread_muted_field(opts)
1356 |> maybe_order(opts)
1357 |> restrict_recipients(recipients, opts[:user])
1358 |> restrict_replies(opts)
1359 |> restrict_since(opts)
1360 |> restrict_local(opts)
1361 |> restrict_remote(opts)
1362 |> restrict_actor(opts)
1363 |> restrict_type(opts)
1364 |> restrict_state(opts)
1365 |> restrict_favorited_by(opts)
1366 |> restrict_blocked(restrict_blocked_opts)
1367 |> restrict_blockers_visibility(opts)
1368 |> restrict_muted(restrict_muted_opts)
1369 |> restrict_filtered(opts)
1370 |> restrict_media(opts)
1371 |> restrict_visibility(opts)
1372 |> restrict_thread_visibility(opts, config)
1373 |> restrict_reblogs(opts)
1374 |> restrict_pinned(opts)
1375 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1376 |> restrict_instance(opts)
1377 |> restrict_announce_object_actor(opts)
1378 |> restrict_filtered(opts)
1379 |> Activity.restrict_deactivated_users()
1380 |> exclude_poll_votes(opts)
1381 |> exclude_invisible_actors(opts)
1382 |> exclude_visibility(opts)
1383
1384 if Config.feature_enabled?(:improved_hashtag_timeline) do
1385 query
1386 |> restrict_hashtag_any(opts)
1387 |> restrict_hashtag_all(opts)
1388 |> restrict_hashtag_reject_any(opts)
1389 else
1390 query
1391 |> restrict_embedded_tag_any(opts)
1392 |> restrict_embedded_tag_all(opts)
1393 |> restrict_embedded_tag_reject_any(opts)
1394 end
1395 end
1396
1397 @doc """
1398 Fetch favorites activities of user with order by sort adds to favorites
1399 """
1400 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1401 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1402 user.ap_id
1403 |> Activity.Queries.by_actor()
1404 |> Activity.Queries.by_type("Like")
1405 |> Activity.with_joined_object()
1406 |> Object.with_joined_activity()
1407 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1408 |> order_by([like, _, _], desc_nulls_last: like.id)
1409 |> Pagination.fetch_paginated(
1410 Map.merge(params, %{skip_order: true}),
1411 pagination
1412 )
1413 end
1414
1415 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1416 Enum.map(activities, fn
1417 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1418 if Enum.any?(bcc, &(&1 in list_memberships)) do
1419 update_in(activity.data["cc"], &[user_ap_id | &1])
1420 else
1421 activity
1422 end
1423
1424 activity ->
1425 activity
1426 end)
1427 end
1428
1429 defp maybe_update_cc(activities, _, _), do: activities
1430
1431 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1432 from(activity in query,
1433 where:
1434 fragment("? && ?", activity.recipients, ^recipients) or
1435 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1436 ^Constants.as_public() in activity.recipients)
1437 )
1438 end
1439
1440 def fetch_activities_bounded(
1441 recipients,
1442 recipients_with_public,
1443 opts \\ %{},
1444 pagination \\ :keyset
1445 ) do
1446 fetch_activities_query([], opts)
1447 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1448 |> Pagination.fetch_paginated(opts, pagination)
1449 |> Enum.reverse()
1450 end
1451
1452 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1453 def upload(file, opts \\ []) do
1454 with {:ok, data} <- Upload.store(file, opts) do
1455 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1456
1457 Repo.insert(%Object{data: obj_data})
1458 end
1459 end
1460
1461 @spec get_actor_url(any()) :: binary() | nil
1462 defp get_actor_url(url) when is_binary(url), do: url
1463 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1464
1465 defp get_actor_url(url) when is_list(url) do
1466 url
1467 |> List.first()
1468 |> get_actor_url()
1469 end
1470
1471 defp get_actor_url(_url), do: nil
1472
1473 defp normalize_image(%{"url" => url}) do
1474 %{
1475 "type" => "Image",
1476 "url" => [%{"href" => url}]
1477 }
1478 end
1479
1480 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1481 defp normalize_image(_), do: nil
1482
1483 defp object_to_user_data(data, additional) do
1484 fields =
1485 data
1486 |> Map.get("attachment", [])
1487 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1488 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1489
1490 emojis =
1491 data
1492 |> Map.get("tag", [])
1493 |> Enum.filter(fn
1494 %{"type" => "Emoji"} -> true
1495 _ -> false
1496 end)
1497 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1498 {String.trim(name, ":"), url}
1499 end)
1500
1501 is_locked = data["manuallyApprovesFollowers"] || false
1502 data = Transmogrifier.maybe_fix_user_object(data)
1503 is_discoverable = data["discoverable"] || false
1504 invisible = data["invisible"] || false
1505 actor_type = data["type"] || "Person"
1506
1507 featured_address = data["featured"]
1508 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1509
1510 public_key =
1511 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1512 data["publicKey"]["publicKeyPem"]
1513 end
1514
1515 shared_inbox =
1516 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1517 data["endpoints"]["sharedInbox"]
1518 end
1519
1520 # if WebFinger request was already done, we probably have acct, otherwise
1521 # we request WebFinger here
1522 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1523
1524 %{
1525 ap_id: data["id"],
1526 uri: get_actor_url(data["url"]),
1527 ap_enabled: true,
1528 banner: normalize_image(data["image"]),
1529 fields: fields,
1530 emoji: emojis,
1531 is_locked: is_locked,
1532 is_discoverable: is_discoverable,
1533 invisible: invisible,
1534 avatar: normalize_image(data["icon"]),
1535 name: data["name"],
1536 follower_address: data["followers"],
1537 following_address: data["following"],
1538 featured_address: featured_address,
1539 bio: data["summary"] || "",
1540 actor_type: actor_type,
1541 also_known_as: Map.get(data, "alsoKnownAs", []),
1542 public_key: public_key,
1543 inbox: data["inbox"],
1544 shared_inbox: shared_inbox,
1545 pinned_objects: pinned_objects,
1546 nickname: nickname
1547 }
1548 end
1549
1550 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1551 generated = "#{username}@#{URI.parse(data["id"]).host}"
1552
1553 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1554 case WebFinger.finger(generated) do
1555 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1556 _ -> generated
1557 end
1558 else
1559 generated
1560 end
1561 end
1562
1563 # nickname can be nil because of virtual actors
1564 defp generate_nickname(_), do: nil
1565
1566 def fetch_follow_information_for_user(user) do
1567 with {:ok, following_data} <-
1568 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1569 {:ok, hide_follows} <- collection_private(following_data),
1570 {:ok, followers_data} <-
1571 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1572 {:ok, hide_followers} <- collection_private(followers_data) do
1573 {:ok,
1574 %{
1575 hide_follows: hide_follows,
1576 follower_count: normalize_counter(followers_data["totalItems"]),
1577 following_count: normalize_counter(following_data["totalItems"]),
1578 hide_followers: hide_followers
1579 }}
1580 else
1581 {:error, _} = e -> e
1582 e -> {:error, e}
1583 end
1584 end
1585
1586 defp normalize_counter(counter) when is_integer(counter), do: counter
1587 defp normalize_counter(_), do: 0
1588
1589 def maybe_update_follow_information(user_data) do
1590 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1591 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1592 {_, true} <-
1593 {:collections_available,
1594 !!(user_data[:following_address] && user_data[:follower_address])},
1595 {:ok, info} <-
1596 fetch_follow_information_for_user(user_data) do
1597 info = Map.merge(user_data[:info] || %{}, info)
1598
1599 user_data
1600 |> Map.put(:info, info)
1601 else
1602 {:user_type_check, false} ->
1603 user_data
1604
1605 {:collections_available, false} ->
1606 user_data
1607
1608 {:enabled, false} ->
1609 user_data
1610
1611 e ->
1612 Logger.error(
1613 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1614 )
1615
1616 user_data
1617 end
1618 end
1619
1620 defp collection_private(%{"first" => %{"type" => type}})
1621 when type in ["CollectionPage", "OrderedCollectionPage"],
1622 do: {:ok, false}
1623
1624 defp collection_private(%{"first" => first}) do
1625 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1626 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1627 {:ok, false}
1628 else
1629 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1630 {:error, _} = e -> e
1631 e -> {:error, e}
1632 end
1633 end
1634
1635 defp collection_private(_data), do: {:ok, true}
1636
1637 def user_data_from_user_object(data, additional \\ []) do
1638 with {:ok, data} <- MRF.filter(data) do
1639 {:ok, object_to_user_data(data, additional)}
1640 else
1641 e -> {:error, e}
1642 end
1643 end
1644
1645 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1646 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1647 {:ok, data} <- user_data_from_user_object(data, additional) do
1648 {:ok, maybe_update_follow_information(data)}
1649 else
1650 # If this has been deleted, only log a debug and not an error
1651 {:error, "Object has been deleted" = e} ->
1652 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1653 {:error, e}
1654
1655 {:error, {:reject, reason} = e} ->
1656 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1657 {:error, e}
1658
1659 {:error, e} ->
1660 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1661 {:error, e}
1662 end
1663 end
1664
1665 def maybe_handle_clashing_nickname(data) do
1666 with nickname when is_binary(nickname) <- data[:nickname],
1667 %User{} = old_user <- User.get_by_nickname(nickname),
1668 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1669 Logger.info(
1670 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1671 )
1672
1673 old_user
1674 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1675 |> User.update_and_set_cache()
1676 else
1677 {:ap_id_comparison, true} ->
1678 Logger.info(
1679 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1680 )
1681
1682 _ ->
1683 nil
1684 end
1685 end
1686
1687 def pin_data_from_featured_collection(%{
1688 "type" => "OrderedCollection",
1689 "first" => first
1690 }) do
1691 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1692 page
1693 |> Map.get("orderedItems")
1694 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1695 else
1696 e ->
1697 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1698 {:ok, %{}}
1699 end
1700 end
1701
1702 def pin_data_from_featured_collection(
1703 %{
1704 "type" => type
1705 } = collection
1706 )
1707 when type in ["OrderedCollection", "Collection"] do
1708 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1709
1710 # Items can either be a map _or_ a string
1711 objects
1712 |> Map.new(fn
1713 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1714 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1715 end)
1716 end
1717
1718 def fetch_and_prepare_featured_from_ap_id(nil) do
1719 {:ok, %{}}
1720 end
1721
1722 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1723 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1724 {:ok, pin_data_from_featured_collection(data)}
1725 else
1726 e ->
1727 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1728 {:ok, %{}}
1729 end
1730 end
1731
1732 def pinned_fetch_task(nil), do: nil
1733
1734 def pinned_fetch_task(%{pinned_objects: pins}) do
1735 if Enum.all?(pins, fn {ap_id, _} ->
1736 Object.get_cached_by_ap_id(ap_id) ||
1737 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1738 end) do
1739 :ok
1740 else
1741 :error
1742 end
1743 end
1744
1745 def make_user_from_ap_id(ap_id, additional \\ []) do
1746 user = User.get_cached_by_ap_id(ap_id)
1747
1748 if user && !User.ap_enabled?(user) do
1749 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1750 else
1751 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1752 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1753
1754 if user do
1755 user
1756 |> User.remote_user_changeset(data)
1757 |> User.update_and_set_cache()
1758 else
1759 maybe_handle_clashing_nickname(data)
1760
1761 data
1762 |> User.remote_user_changeset()
1763 |> Repo.insert()
1764 |> User.set_cache()
1765 end
1766 end
1767 end
1768 end
1769
1770 def make_user_from_nickname(nickname) do
1771 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1772 WebFinger.finger(nickname) do
1773 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1774 else
1775 _e -> {:error, "No AP id in WebFinger"}
1776 end
1777 end
1778
1779 # filter out broken threads
1780 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1781 entire_thread_visible_for_user?(activity, user)
1782 end
1783
1784 # do post-processing on a specific activity
1785 def contain_activity(%Activity{} = activity, %User{} = user) do
1786 contain_broken_threads(activity, user)
1787 end
1788
1789 def fetch_direct_messages_query do
1790 Activity
1791 |> restrict_type(%{type: "Create"})
1792 |> restrict_visibility(%{visibility: "direct"})
1793 |> order_by([activity], asc: activity.id)
1794 end
1795 end