Streamer rework
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
17 alias Pleroma.Repo
18 alias Pleroma.Upload
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
74 true
75 else
76 _e -> false
77 end
78 else
79 true
80 end
81 end
82
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
86 end
87
88 defp check_remote_limit(_), do: true
89
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
92 end
93
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
96 end
97
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
100 "type" => "Create"
101 }) do
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
104 end
105 end
106
107 def increase_replies_count_if_reply(_create_data), do: :noop
108
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
111 }) do
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
114 end
115 end
116
117 def decrease_replies_count_if_reply(_object), do: :noop
118
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
121 "type" => "Create",
122 "actor" => actor
123 }) do
124 Object.increase_vote_count(reply_ap_id, name, actor)
125 end
126
127 def increase_poll_votes_if_vote(_create_data), do: :noop
128
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
133 {:ok, activity} <-
134 Repo.insert(%Activity{
135 data: object,
136 local: local,
137 recipients: recipients,
138 actor: object["actor"]
139 }) do
140 {:ok, activity, meta}
141 end
142 end
143
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
155 {:ok, activity} =
156 Repo.insert(%Activity{
157 data: map,
158 local: local,
159 actor: map["actor"],
160 recipients: recipients
161 })
162
163 # Splice in the child object if we have one.
164 activity =
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
167 else
168 activity
169 end
170
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
172
173 {:ok, activity}
174 else
175 %Activity{} = activity ->
176 {:ok, activity}
177
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
180 data: map,
181 local: local,
182 actor: map["actor"],
183 recipients: recipients,
184 id: "pleroma:fakeid"
185 }
186
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
188 {:ok, activity}
189
190 error ->
191 {:error, error}
192 end
193 end
194
195 def notify_and_stream(activity) do
196 Notification.create_notifications(activity)
197
198 conversation = create_or_bump_conversation(activity, activity.actor)
199 participations = get_participations(conversation)
200 stream_out(activity)
201 stream_out_participations(participations)
202 end
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor),
207 Participation.mark_as_read(user, conversation) do
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
229 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
230 conversation = Repo.preload(conversation, :participations),
231 last_activity_id =
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
233 "user" => user,
234 "blocking_user" => user
235 }) do
236 if last_activity_id do
237 stream_out_participations(conversation.participations)
238 end
239 end
240 end
241
242 def stream_out_participations(_, _), do: :noop
243
244 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
245 when data_type in ["Create", "Announce", "Delete"] do
246 activity
247 |> Topics.get_activity_topics()
248 |> Streamer.stream(activity)
249 end
250
251 def stream_out(_activity) do
252 :noop
253 end
254
255 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
256 def create(params, fake \\ false) do
257 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 result
259 end
260 end
261
262 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
263 additional = params[:additional] || %{}
264 # only accept false as false value
265 local = !(params[:local] == false)
266 published = params[:published]
267 quick_insert? = Config.get([:env]) == :benchmark
268
269 with create_data <-
270 make_create_data(
271 %{to: to, actor: actor, published: published, context: context, object: object},
272 additional
273 ),
274 {:ok, activity} <- insert(create_data, local, fake),
275 {:fake, false, activity} <- {:fake, fake, activity},
276 _ <- increase_replies_count_if_reply(create_data),
277 _ <- increase_poll_votes_if_vote(create_data),
278 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
279 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
280 _ <- notify_and_stream(activity),
281 :ok <- maybe_federate(activity) do
282 {:ok, activity}
283 else
284 {:quick_insert, true, activity} ->
285 {:ok, activity}
286
287 {:fake, true, activity} ->
288 {:ok, activity}
289
290 {:error, message} ->
291 Repo.rollback(message)
292 end
293 end
294
295 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
296 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
297 additional = params[:additional] || %{}
298 # only accept false as false value
299 local = !(params[:local] == false)
300 published = params[:published]
301
302 with listen_data <-
303 make_listen_data(
304 %{to: to, actor: actor, published: published, context: context, object: object},
305 additional
306 ),
307 {:ok, activity} <- insert(listen_data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
310 {:ok, activity}
311 end
312 end
313
314 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def accept(params) do
316 accept_or_reject("Accept", params)
317 end
318
319 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
320 def reject(params) do
321 accept_or_reject("Reject", params)
322 end
323
324 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
325 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
326 local = Map.get(params, :local, true)
327 activity_id = Map.get(params, :activity_id, nil)
328
329 with data <-
330 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
331 |> Utils.maybe_put("id", activity_id),
332 {:ok, activity} <- insert(data, local),
333 _ <- notify_and_stream(activity),
334 :ok <- maybe_federate(activity) do
335 {:ok, activity}
336 end
337 end
338
339 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
341 local = !(params[:local] == false)
342 activity_id = params[:activity_id]
343
344 with data <- %{
345 "to" => to,
346 "cc" => cc,
347 "type" => "Update",
348 "actor" => actor,
349 "object" => object
350 },
351 data <- Utils.maybe_put(data, "id", activity_id),
352 {:ok, activity} <- insert(data, local),
353 _ <- notify_and_stream(activity),
354 :ok <- maybe_federate(activity) do
355 {:ok, activity}
356 end
357 end
358
359 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
360 {:ok, Activity.t(), Object.t()} | {:error, any()}
361 def react_with_emoji(user, object, emoji, options \\ []) do
362 with {:ok, result} <-
363 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
364 result
365 end
366 end
367
368 defp do_react_with_emoji(user, object, emoji, options) do
369 with local <- Keyword.get(options, :local, true),
370 activity_id <- Keyword.get(options, :activity_id, nil),
371 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
372 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
373 {:ok, activity} <- insert(reaction_data, local),
374 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
375 _ <- notify_and_stream(activity),
376 :ok <- maybe_federate(activity) do
377 {:ok, activity, object}
378 else
379 false -> {:error, false}
380 {:error, error} -> Repo.rollback(error)
381 end
382 end
383
384 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
385 {:ok, Activity.t(), Object.t()} | {:error, any()}
386 def unreact_with_emoji(user, reaction_id, options \\ []) do
387 with {:ok, result} <-
388 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
389 result
390 end
391 end
392
393 defp do_unreact_with_emoji(user, reaction_id, options) do
394 with local <- Keyword.get(options, :local, true),
395 activity_id <- Keyword.get(options, :activity_id, nil),
396 user_ap_id <- user.ap_id,
397 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
398 object <- Object.normalize(reaction_activity),
399 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
400 {:ok, activity} <- insert(unreact_data, local),
401 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
402 _ <- notify_and_stream(activity),
403 :ok <- maybe_federate(activity) do
404 {:ok, activity, object}
405 else
406 {:error, error} -> Repo.rollback(error)
407 end
408 end
409
410 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
411 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
412 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
413 with {:ok, result} <-
414 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
415 result
416 end
417 end
418
419 defp do_unlike(actor, object, activity_id, local) do
420 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
421 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
422 {:ok, unlike_activity} <- insert(unlike_data, local),
423 {:ok, _activity} <- Repo.delete(like_activity),
424 {:ok, object} <- remove_like_from_object(like_activity, object),
425 _ <- notify_and_stream(unlike_activity),
426 :ok <- maybe_federate(unlike_activity) do
427 {:ok, unlike_activity, like_activity, object}
428 else
429 nil -> {:ok, object}
430 {:error, error} -> Repo.rollback(error)
431 end
432 end
433
434 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
435 {:ok, Activity.t(), Object.t()} | {:error, any()}
436 def announce(
437 %User{ap_id: _} = user,
438 %Object{data: %{"id" => _}} = object,
439 activity_id \\ nil,
440 local \\ true,
441 public \\ true
442 ) do
443 with {:ok, result} <-
444 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
445 result
446 end
447 end
448
449 defp do_announce(user, object, activity_id, local, public) do
450 with true <- is_announceable?(object, user, public),
451 object <- Object.get_by_id(object.id),
452 announce_data <- make_announce_data(user, object, activity_id, public),
453 {:ok, activity} <- insert(announce_data, local),
454 {:ok, object} <- add_announce_to_object(activity, object),
455 _ <- notify_and_stream(activity),
456 :ok <- maybe_federate(activity) do
457 {:ok, activity, object}
458 else
459 false -> {:error, false}
460 {:error, error} -> Repo.rollback(error)
461 end
462 end
463
464 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
465 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
466 def unannounce(
467 %User{} = actor,
468 %Object{} = object,
469 activity_id \\ nil,
470 local \\ true
471 ) do
472 with {:ok, result} <-
473 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
474 result
475 end
476 end
477
478 defp do_unannounce(actor, object, activity_id, local) do
479 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
480 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
481 {:ok, unannounce_activity} <- insert(unannounce_data, local),
482 _ <- notify_and_stream(unannounce_activity),
483 :ok <- maybe_federate(unannounce_activity),
484 {:ok, _activity} <- Repo.delete(announce_activity),
485 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
486 {:ok, unannounce_activity, object}
487 else
488 nil -> {:ok, object}
489 {:error, error} -> Repo.rollback(error)
490 end
491 end
492
493 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
494 {:ok, Activity.t()} | {:error, any()}
495 def follow(follower, followed, activity_id \\ nil, local \\ true) do
496 with {:ok, result} <-
497 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
498 result
499 end
500 end
501
502 defp do_follow(follower, followed, activity_id, local) do
503 with data <- make_follow_data(follower, followed, activity_id),
504 {:ok, activity} <- insert(data, local),
505 _ <- notify_and_stream(activity),
506 :ok <- maybe_federate(activity) do
507 {:ok, activity}
508 else
509 {:error, error} -> Repo.rollback(error)
510 end
511 end
512
513 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
514 {:ok, Activity.t()} | nil | {:error, any()}
515 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
516 with {:ok, result} <-
517 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
518 result
519 end
520 end
521
522 defp do_unfollow(follower, followed, activity_id, local) do
523 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
524 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
525 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
526 {:ok, activity} <- insert(unfollow_data, local),
527 _ <- notify_and_stream(activity),
528 :ok <- maybe_federate(activity) do
529 {:ok, activity}
530 else
531 nil -> nil
532 {:error, error} -> Repo.rollback(error)
533 end
534 end
535
536 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
537 {:ok, Activity.t()} | {:error, any()}
538 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
539 with {:ok, result} <-
540 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
541 result
542 end
543 end
544
545 defp do_block(blocker, blocked, activity_id, local) do
546 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
547 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
548
549 if unfollow_blocked do
550 follow_activity = fetch_latest_follow(blocker, blocked)
551 if follow_activity, do: unfollow(blocker, blocked, nil, local)
552 end
553
554 with true <- outgoing_blocks,
555 block_data <- make_block_data(blocker, blocked, activity_id),
556 {:ok, activity} <- insert(block_data, local),
557 _ <- notify_and_stream(activity),
558 :ok <- maybe_federate(activity) do
559 {:ok, activity}
560 else
561 {:error, error} -> Repo.rollback(error)
562 end
563 end
564
565 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
566 {:ok, Activity.t()} | {:error, any()} | nil
567 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
568 with {:ok, result} <-
569 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
570 result
571 end
572 end
573
574 defp do_unblock(blocker, blocked, activity_id, local) do
575 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
576 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
577 {:ok, activity} <- insert(unblock_data, local),
578 _ <- notify_and_stream(activity),
579 :ok <- maybe_federate(activity) do
580 {:ok, activity}
581 else
582 nil -> nil
583 {:error, error} -> Repo.rollback(error)
584 end
585 end
586
587 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
588 def flag(
589 %{
590 actor: actor,
591 context: _context,
592 account: account,
593 statuses: statuses,
594 content: content
595 } = params
596 ) do
597 # only accept false as false value
598 local = !(params[:local] == false)
599 forward = !(params[:forward] == false)
600
601 additional = params[:additional] || %{}
602
603 additional =
604 if forward do
605 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
606 else
607 Map.merge(additional, %{"to" => [], "cc" => []})
608 end
609
610 with flag_data <- make_flag_data(params, additional),
611 {:ok, activity} <- insert(flag_data, local),
612 {:ok, stripped_activity} <- strip_report_status_data(activity),
613 _ <- notify_and_stream(activity),
614 :ok <- maybe_federate(stripped_activity) do
615 User.all_superusers()
616 |> Enum.filter(fn user -> not is_nil(user.email) end)
617 |> Enum.each(fn superuser ->
618 superuser
619 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
620 |> Pleroma.Emails.Mailer.deliver_async()
621 end)
622
623 {:ok, activity}
624 end
625 end
626
627 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
628 def move(%User{} = origin, %User{} = target, local \\ true) do
629 params = %{
630 "type" => "Move",
631 "actor" => origin.ap_id,
632 "object" => origin.ap_id,
633 "target" => target.ap_id
634 }
635
636 with true <- origin.ap_id in target.also_known_as,
637 {:ok, activity} <- insert(params, local),
638 _ <- notify_and_stream(activity) do
639 maybe_federate(activity)
640
641 BackgroundWorker.enqueue("move_following", %{
642 "origin_id" => origin.id,
643 "target_id" => target.id
644 })
645
646 {:ok, activity}
647 else
648 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
649 err -> err
650 end
651 end
652
653 def fetch_activities_for_context_query(context, opts) do
654 public = [Constants.as_public()]
655
656 recipients =
657 if opts["user"],
658 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
659 else: public
660
661 from(activity in Activity)
662 |> maybe_preload_objects(opts)
663 |> maybe_preload_bookmarks(opts)
664 |> maybe_set_thread_muted_field(opts)
665 |> restrict_blocked(opts)
666 |> restrict_recipients(recipients, opts["user"])
667 |> where(
668 [activity],
669 fragment(
670 "?->>'type' = ? and ?->>'context' = ?",
671 activity.data,
672 "Create",
673 activity.data,
674 ^context
675 )
676 )
677 |> exclude_poll_votes(opts)
678 |> exclude_id(opts)
679 |> order_by([activity], desc: activity.id)
680 end
681
682 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
683 def fetch_activities_for_context(context, opts \\ %{}) do
684 context
685 |> fetch_activities_for_context_query(opts)
686 |> Repo.all()
687 end
688
689 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
690 FlakeId.Ecto.CompatType.t() | nil
691 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
692 context
693 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
694 |> limit(1)
695 |> select([a], a.id)
696 |> Repo.one()
697 end
698
699 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
700 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
701 opts = Map.drop(opts, ["user"])
702
703 [Constants.as_public()]
704 |> fetch_activities_query(opts)
705 |> restrict_unlisted()
706 |> Pagination.fetch_paginated(opts, pagination)
707 end
708
709 @valid_visibilities ~w[direct unlisted public private]
710
711 defp restrict_visibility(query, %{visibility: visibility})
712 when is_list(visibility) do
713 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
714 query =
715 from(
716 a in query,
717 where:
718 fragment(
719 "activity_visibility(?, ?, ?) = ANY (?)",
720 a.actor,
721 a.recipients,
722 a.data,
723 ^visibility
724 )
725 )
726
727 query
728 else
729 Logger.error("Could not restrict visibility to #{visibility}")
730 end
731 end
732
733 defp restrict_visibility(query, %{visibility: visibility})
734 when visibility in @valid_visibilities do
735 from(
736 a in query,
737 where:
738 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
739 )
740 end
741
742 defp restrict_visibility(_query, %{visibility: visibility})
743 when visibility not in @valid_visibilities do
744 Logger.error("Could not restrict visibility to #{visibility}")
745 end
746
747 defp restrict_visibility(query, _visibility), do: query
748
749 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
750 when is_list(visibility) do
751 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
752 from(
753 a in query,
754 where:
755 not fragment(
756 "activity_visibility(?, ?, ?) = ANY (?)",
757 a.actor,
758 a.recipients,
759 a.data,
760 ^visibility
761 )
762 )
763 else
764 Logger.error("Could not exclude visibility to #{visibility}")
765 query
766 end
767 end
768
769 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
770 when visibility in @valid_visibilities do
771 from(
772 a in query,
773 where:
774 not fragment(
775 "activity_visibility(?, ?, ?) = ?",
776 a.actor,
777 a.recipients,
778 a.data,
779 ^visibility
780 )
781 )
782 end
783
784 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
785 when visibility not in [nil | @valid_visibilities] do
786 Logger.error("Could not exclude visibility to #{visibility}")
787 query
788 end
789
790 defp exclude_visibility(query, _visibility), do: query
791
792 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
793 do: query
794
795 defp restrict_thread_visibility(
796 query,
797 %{"user" => %User{skip_thread_containment: true}},
798 _
799 ),
800 do: query
801
802 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
803 from(
804 a in query,
805 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
806 )
807 end
808
809 defp restrict_thread_visibility(query, _, _), do: query
810
811 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
812 params =
813 params
814 |> Map.put("user", reading_user)
815 |> Map.put("actor_id", user.ap_id)
816
817 recipients =
818 user_activities_recipients(%{
819 "godmode" => params["godmode"],
820 "reading_user" => reading_user
821 })
822
823 fetch_activities(recipients, params)
824 |> Enum.reverse()
825 end
826
827 def fetch_user_activities(user, reading_user, params \\ %{}) do
828 params =
829 params
830 |> Map.put("type", ["Create", "Announce"])
831 |> Map.put("user", reading_user)
832 |> Map.put("actor_id", user.ap_id)
833 |> Map.put("pinned_activity_ids", user.pinned_activities)
834
835 params =
836 if User.blocks?(reading_user, user) do
837 params
838 else
839 params
840 |> Map.put("blocking_user", reading_user)
841 |> Map.put("muting_user", reading_user)
842 end
843
844 recipients =
845 user_activities_recipients(%{
846 "godmode" => params["godmode"],
847 "reading_user" => reading_user
848 })
849
850 fetch_activities(recipients, params)
851 |> Enum.reverse()
852 end
853
854 def fetch_statuses(reading_user, params) do
855 params =
856 params
857 |> Map.put("type", ["Create", "Announce"])
858
859 recipients =
860 user_activities_recipients(%{
861 "godmode" => params["godmode"],
862 "reading_user" => reading_user
863 })
864
865 fetch_activities(recipients, params, :offset)
866 |> Enum.reverse()
867 end
868
869 defp user_activities_recipients(%{"godmode" => true}) do
870 []
871 end
872
873 defp user_activities_recipients(%{"reading_user" => reading_user}) do
874 if reading_user do
875 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
876 else
877 [Constants.as_public()]
878 end
879 end
880
881 defp restrict_since(query, %{"since_id" => ""}), do: query
882
883 defp restrict_since(query, %{"since_id" => since_id}) do
884 from(activity in query, where: activity.id > ^since_id)
885 end
886
887 defp restrict_since(query, _), do: query
888
889 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
890 raise "Can't use the child object without preloading!"
891 end
892
893 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
894 when is_list(tag_reject) and tag_reject != [] do
895 from(
896 [_activity, object] in query,
897 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
898 )
899 end
900
901 defp restrict_tag_reject(query, _), do: query
902
903 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
904 raise "Can't use the child object without preloading!"
905 end
906
907 defp restrict_tag_all(query, %{"tag_all" => tag_all})
908 when is_list(tag_all) and tag_all != [] do
909 from(
910 [_activity, object] in query,
911 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
912 )
913 end
914
915 defp restrict_tag_all(query, _), do: query
916
917 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
918 raise "Can't use the child object without preloading!"
919 end
920
921 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
922 from(
923 [_activity, object] in query,
924 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
925 )
926 end
927
928 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
929 from(
930 [_activity, object] in query,
931 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
932 )
933 end
934
935 defp restrict_tag(query, _), do: query
936
937 defp restrict_recipients(query, [], _user), do: query
938
939 defp restrict_recipients(query, recipients, nil) do
940 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
941 end
942
943 defp restrict_recipients(query, recipients, user) do
944 from(
945 activity in query,
946 where: fragment("? && ?", ^recipients, activity.recipients),
947 or_where: activity.actor == ^user.ap_id
948 )
949 end
950
951 defp restrict_local(query, %{"local_only" => true}) do
952 from(activity in query, where: activity.local == true)
953 end
954
955 defp restrict_local(query, _), do: query
956
957 defp restrict_actor(query, %{"actor_id" => actor_id}) do
958 from(activity in query, where: activity.actor == ^actor_id)
959 end
960
961 defp restrict_actor(query, _), do: query
962
963 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
964 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
965 end
966
967 defp restrict_type(query, %{"type" => type}) do
968 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
969 end
970
971 defp restrict_type(query, _), do: query
972
973 defp restrict_state(query, %{"state" => state}) do
974 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
975 end
976
977 defp restrict_state(query, _), do: query
978
979 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
980 from(
981 [_activity, object] in query,
982 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
983 )
984 end
985
986 defp restrict_favorited_by(query, _), do: query
987
988 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
989 raise "Can't use the child object without preloading!"
990 end
991
992 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
993 from(
994 [_activity, object] in query,
995 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
996 )
997 end
998
999 defp restrict_media(query, _), do: query
1000
1001 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
1002 from(
1003 [_activity, object] in query,
1004 where: fragment("?->>'inReplyTo' is null", object.data)
1005 )
1006 end
1007
1008 defp restrict_replies(query, %{
1009 "reply_filtering_user" => user,
1010 "reply_visibility" => "self"
1011 }) do
1012 from(
1013 [activity, object] in query,
1014 where:
1015 fragment(
1016 "?->>'inReplyTo' is null OR ? = ANY(?)",
1017 object.data,
1018 ^user.ap_id,
1019 activity.recipients
1020 )
1021 )
1022 end
1023
1024 defp restrict_replies(query, %{
1025 "reply_filtering_user" => user,
1026 "reply_visibility" => "following"
1027 }) do
1028 from(
1029 [activity, object] in query,
1030 where:
1031 fragment(
1032 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
1033 object.data,
1034 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1035 activity.recipients,
1036 activity.actor,
1037 activity.actor,
1038 ^user.ap_id
1039 )
1040 )
1041 end
1042
1043 defp restrict_replies(query, _), do: query
1044
1045 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
1046 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1047 end
1048
1049 defp restrict_reblogs(query, _), do: query
1050
1051 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1052
1053 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1054 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1055
1056 query =
1057 from([activity] in query,
1058 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1059 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1060 )
1061
1062 unless opts["skip_preload"] do
1063 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1064 else
1065 query
1066 end
1067 end
1068
1069 defp restrict_muted(query, _), do: query
1070
1071 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1072 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1073 domain_blocks = user.domain_blocks || []
1074
1075 following_ap_ids = User.get_friends_ap_ids(user)
1076
1077 query =
1078 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1079
1080 from(
1081 [activity, object: o] in query,
1082 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1083 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1084 where:
1085 fragment(
1086 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1087 activity.data,
1088 activity.data,
1089 ^blocked_ap_ids
1090 ),
1091 where:
1092 fragment(
1093 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1094 activity.actor,
1095 ^domain_blocks,
1096 activity.actor,
1097 ^following_ap_ids
1098 ),
1099 where:
1100 fragment(
1101 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1102 o.data,
1103 ^domain_blocks,
1104 o.data,
1105 ^following_ap_ids
1106 )
1107 )
1108 end
1109
1110 defp restrict_blocked(query, _), do: query
1111
1112 defp restrict_unlisted(query) do
1113 from(
1114 activity in query,
1115 where:
1116 fragment(
1117 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1118 activity.data,
1119 ^[Constants.as_public()]
1120 )
1121 )
1122 end
1123
1124 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
1125 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
1126 # and `restrict_muted/2`
1127
1128 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1129 when pinned in [true, "true", "1"] do
1130 from(activity in query, where: activity.id in ^ids)
1131 end
1132
1133 defp restrict_pinned(query, _), do: query
1134
1135 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1136 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1137
1138 from(
1139 activity in query,
1140 where:
1141 fragment(
1142 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1143 activity.data,
1144 activity.actor,
1145 ^muted_reblogs
1146 )
1147 )
1148 end
1149
1150 defp restrict_muted_reblogs(query, _), do: query
1151
1152 defp restrict_instance(query, %{"instance" => instance}) do
1153 users =
1154 from(
1155 u in User,
1156 select: u.ap_id,
1157 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1158 )
1159 |> Repo.all()
1160
1161 from(activity in query, where: activity.actor in ^users)
1162 end
1163
1164 defp restrict_instance(query, _), do: query
1165
1166 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1167
1168 defp exclude_poll_votes(query, _) do
1169 if has_named_binding?(query, :object) do
1170 from([activity, object: o] in query,
1171 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1172 )
1173 else
1174 query
1175 end
1176 end
1177
1178 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1179 from(activity in query, where: activity.id != ^id)
1180 end
1181
1182 defp exclude_id(query, _), do: query
1183
1184 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1185
1186 defp maybe_preload_objects(query, _) do
1187 query
1188 |> Activity.with_preloaded_object()
1189 end
1190
1191 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1192
1193 defp maybe_preload_bookmarks(query, opts) do
1194 query
1195 |> Activity.with_preloaded_bookmark(opts["user"])
1196 end
1197
1198 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1199 query
1200 |> Activity.with_preloaded_report_notes()
1201 end
1202
1203 defp maybe_preload_report_notes(query, _), do: query
1204
1205 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1206
1207 defp maybe_set_thread_muted_field(query, opts) do
1208 query
1209 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1210 end
1211
1212 defp maybe_order(query, %{order: :desc}) do
1213 query
1214 |> order_by(desc: :id)
1215 end
1216
1217 defp maybe_order(query, %{order: :asc}) do
1218 query
1219 |> order_by(asc: :id)
1220 end
1221
1222 defp maybe_order(query, _), do: query
1223
1224 defp fetch_activities_query_ap_ids_ops(opts) do
1225 source_user = opts["muting_user"]
1226 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1227
1228 ap_id_relationships =
1229 ap_id_relationships ++
1230 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1231 [:block]
1232 else
1233 []
1234 end
1235
1236 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1237
1238 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1239 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1240
1241 restrict_muted_reblogs_opts =
1242 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1243
1244 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1245 end
1246
1247 def fetch_activities_query(recipients, opts \\ %{}) do
1248 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1249 fetch_activities_query_ap_ids_ops(opts)
1250
1251 config = %{
1252 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1253 }
1254
1255 Activity
1256 |> maybe_preload_objects(opts)
1257 |> maybe_preload_bookmarks(opts)
1258 |> maybe_preload_report_notes(opts)
1259 |> maybe_set_thread_muted_field(opts)
1260 |> maybe_order(opts)
1261 |> restrict_recipients(recipients, opts["user"])
1262 |> restrict_replies(opts)
1263 |> restrict_tag(opts)
1264 |> restrict_tag_reject(opts)
1265 |> restrict_tag_all(opts)
1266 |> restrict_since(opts)
1267 |> restrict_local(opts)
1268 |> restrict_actor(opts)
1269 |> restrict_type(opts)
1270 |> restrict_state(opts)
1271 |> restrict_favorited_by(opts)
1272 |> restrict_blocked(restrict_blocked_opts)
1273 |> restrict_muted(restrict_muted_opts)
1274 |> restrict_media(opts)
1275 |> restrict_visibility(opts)
1276 |> restrict_thread_visibility(opts, config)
1277 |> restrict_reblogs(opts)
1278 |> restrict_pinned(opts)
1279 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1280 |> restrict_instance(opts)
1281 |> Activity.restrict_deactivated_users()
1282 |> exclude_poll_votes(opts)
1283 |> exclude_visibility(opts)
1284 end
1285
1286 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1287 list_memberships = Pleroma.List.memberships(opts["user"])
1288
1289 fetch_activities_query(recipients ++ list_memberships, opts)
1290 |> Pagination.fetch_paginated(opts, pagination)
1291 |> Enum.reverse()
1292 |> maybe_update_cc(list_memberships, opts["user"])
1293 end
1294
1295 @doc """
1296 Fetch favorites activities of user with order by sort adds to favorites
1297 """
1298 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1299 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1300 user.ap_id
1301 |> Activity.Queries.by_actor()
1302 |> Activity.Queries.by_type("Like")
1303 |> Activity.with_joined_object()
1304 |> Object.with_joined_activity()
1305 |> select([_like, object, activity], %{activity | object: object})
1306 |> order_by([like, _, _], desc: like.id)
1307 |> Pagination.fetch_paginated(
1308 Map.merge(params, %{"skip_order" => true}),
1309 pagination,
1310 :object_activity
1311 )
1312 end
1313
1314 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1315 when is_list(list_memberships) and length(list_memberships) > 0 do
1316 Enum.map(activities, fn
1317 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1318 if Enum.any?(bcc, &(&1 in list_memberships)) do
1319 update_in(activity.data["cc"], &[user_ap_id | &1])
1320 else
1321 activity
1322 end
1323
1324 activity ->
1325 activity
1326 end)
1327 end
1328
1329 defp maybe_update_cc(activities, _, _), do: activities
1330
1331 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1332 from(activity in query,
1333 where:
1334 fragment("? && ?", activity.recipients, ^recipients) or
1335 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1336 ^Constants.as_public() in activity.recipients)
1337 )
1338 end
1339
1340 def fetch_activities_bounded(
1341 recipients,
1342 recipients_with_public,
1343 opts \\ %{},
1344 pagination \\ :keyset
1345 ) do
1346 fetch_activities_query([], opts)
1347 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1348 |> Pagination.fetch_paginated(opts, pagination)
1349 |> Enum.reverse()
1350 end
1351
1352 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1353 def upload(file, opts \\ []) do
1354 with {:ok, data} <- Upload.store(file, opts) do
1355 obj_data =
1356 if opts[:actor] do
1357 Map.put(data, "actor", opts[:actor])
1358 else
1359 data
1360 end
1361
1362 Repo.insert(%Object{data: obj_data})
1363 end
1364 end
1365
1366 @spec get_actor_url(any()) :: binary() | nil
1367 defp get_actor_url(url) when is_binary(url), do: url
1368 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1369
1370 defp get_actor_url(url) when is_list(url) do
1371 url
1372 |> List.first()
1373 |> get_actor_url()
1374 end
1375
1376 defp get_actor_url(_url), do: nil
1377
1378 defp object_to_user_data(data) do
1379 avatar =
1380 data["icon"]["url"] &&
1381 %{
1382 "type" => "Image",
1383 "url" => [%{"href" => data["icon"]["url"]}]
1384 }
1385
1386 banner =
1387 data["image"]["url"] &&
1388 %{
1389 "type" => "Image",
1390 "url" => [%{"href" => data["image"]["url"]}]
1391 }
1392
1393 fields =
1394 data
1395 |> Map.get("attachment", [])
1396 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1397 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1398
1399 emojis =
1400 data
1401 |> Map.get("tag", [])
1402 |> Enum.filter(fn
1403 %{"type" => "Emoji"} -> true
1404 _ -> false
1405 end)
1406 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1407 Map.put(acc, String.trim(name, ":"), url)
1408 end)
1409
1410 locked = data["manuallyApprovesFollowers"] || false
1411 data = Transmogrifier.maybe_fix_user_object(data)
1412 discoverable = data["discoverable"] || false
1413 invisible = data["invisible"] || false
1414 actor_type = data["type"] || "Person"
1415
1416 public_key =
1417 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1418 data["publicKey"]["publicKeyPem"]
1419 else
1420 nil
1421 end
1422
1423 shared_inbox =
1424 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1425 data["endpoints"]["sharedInbox"]
1426 else
1427 nil
1428 end
1429
1430 user_data = %{
1431 ap_id: data["id"],
1432 uri: get_actor_url(data["url"]),
1433 ap_enabled: true,
1434 banner: banner,
1435 fields: fields,
1436 emoji: emojis,
1437 locked: locked,
1438 discoverable: discoverable,
1439 invisible: invisible,
1440 avatar: avatar,
1441 name: data["name"],
1442 follower_address: data["followers"],
1443 following_address: data["following"],
1444 bio: data["summary"],
1445 actor_type: actor_type,
1446 also_known_as: Map.get(data, "alsoKnownAs", []),
1447 public_key: public_key,
1448 inbox: data["inbox"],
1449 shared_inbox: shared_inbox
1450 }
1451
1452 # nickname can be nil because of virtual actors
1453 user_data =
1454 if data["preferredUsername"] do
1455 Map.put(
1456 user_data,
1457 :nickname,
1458 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1459 )
1460 else
1461 Map.put(user_data, :nickname, nil)
1462 end
1463
1464 {:ok, user_data}
1465 end
1466
1467 def fetch_follow_information_for_user(user) do
1468 with {:ok, following_data} <-
1469 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1470 {:ok, hide_follows} <- collection_private(following_data),
1471 {:ok, followers_data} <-
1472 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1473 {:ok, hide_followers} <- collection_private(followers_data) do
1474 {:ok,
1475 %{
1476 hide_follows: hide_follows,
1477 follower_count: normalize_counter(followers_data["totalItems"]),
1478 following_count: normalize_counter(following_data["totalItems"]),
1479 hide_followers: hide_followers
1480 }}
1481 else
1482 {:error, _} = e -> e
1483 e -> {:error, e}
1484 end
1485 end
1486
1487 defp normalize_counter(counter) when is_integer(counter), do: counter
1488 defp normalize_counter(_), do: 0
1489
1490 def maybe_update_follow_information(user_data) do
1491 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1492 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1493 {_, true} <-
1494 {:collections_available,
1495 !!(user_data[:following_address] && user_data[:follower_address])},
1496 {:ok, info} <-
1497 fetch_follow_information_for_user(user_data) do
1498 info = Map.merge(user_data[:info] || %{}, info)
1499
1500 user_data
1501 |> Map.put(:info, info)
1502 else
1503 {:user_type_check, false} ->
1504 user_data
1505
1506 {:collections_available, false} ->
1507 user_data
1508
1509 {:enabled, false} ->
1510 user_data
1511
1512 e ->
1513 Logger.error(
1514 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1515 )
1516
1517 user_data
1518 end
1519 end
1520
1521 defp collection_private(%{"first" => %{"type" => type}})
1522 when type in ["CollectionPage", "OrderedCollectionPage"],
1523 do: {:ok, false}
1524
1525 defp collection_private(%{"first" => first}) do
1526 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1527 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1528 {:ok, false}
1529 else
1530 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1531 {:error, _} = e -> e
1532 e -> {:error, e}
1533 end
1534 end
1535
1536 defp collection_private(_data), do: {:ok, true}
1537
1538 def user_data_from_user_object(data) do
1539 with {:ok, data} <- MRF.filter(data),
1540 {:ok, data} <- object_to_user_data(data) do
1541 {:ok, data}
1542 else
1543 e -> {:error, e}
1544 end
1545 end
1546
1547 def fetch_and_prepare_user_from_ap_id(ap_id) do
1548 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1549 {:ok, data} <- user_data_from_user_object(data),
1550 data <- maybe_update_follow_information(data) do
1551 {:ok, data}
1552 else
1553 {:error, "Object has been deleted"} = e ->
1554 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1555 {:error, e}
1556
1557 e ->
1558 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1559 {:error, e}
1560 end
1561 end
1562
1563 def make_user_from_ap_id(ap_id) do
1564 user = User.get_cached_by_ap_id(ap_id)
1565
1566 if user && !User.ap_enabled?(user) do
1567 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1568 else
1569 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1570 if user do
1571 user
1572 |> User.remote_user_changeset(data)
1573 |> User.update_and_set_cache()
1574 else
1575 data
1576 |> User.remote_user_changeset()
1577 |> Repo.insert()
1578 |> User.set_cache()
1579 end
1580 else
1581 e -> {:error, e}
1582 end
1583 end
1584 end
1585
1586 def make_user_from_nickname(nickname) do
1587 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1588 make_user_from_ap_id(ap_id)
1589 else
1590 _e -> {:error, "No AP id in WebFinger"}
1591 end
1592 end
1593
1594 # filter out broken threads
1595 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1596 entire_thread_visible_for_user?(activity, user)
1597 end
1598
1599 # do post-processing on a specific activity
1600 def contain_activity(%Activity{} = activity, %User{} = user) do
1601 contain_broken_threads(activity, user)
1602 end
1603
1604 def fetch_direct_messages_query do
1605 Activity
1606 |> restrict_type(%{"type" => "Create"})
1607 |> restrict_visibility(%{visibility: "direct"})
1608 |> order_by([activity], asc: activity.id)
1609 end
1610 end