make bulk user creation from admin works as a transaction
[akkoma] / lib / pleroma / user.ex
index 3eb684c3ac9593ff67476000ceca12515f4a67d1..722e8ff6b0736d94bf6b364bd47336f7dee3100c 100644 (file)
@@ -276,7 +276,13 @@ defmodule Pleroma.User do
   @doc "Inserts provided changeset, performs post-registration actions (confirmation email sending etc.)"
   def register(%Ecto.Changeset{} = changeset) do
     with {:ok, user} <- Repo.insert(changeset),
-         {:ok, user} <- autofollow_users(user),
+         {:ok, user} <- post_register_action(user) do
+      {:ok, user}
+    end
+  end
+
+  def post_register_action(%User{} = user) do
+    with {:ok, user} <- autofollow_users(user),
          {:ok, user} <- set_cache(user),
          {:ok, _} <- Pleroma.User.WelcomeMessage.post_welcome_message_to_user(user),
          {:ok, _} <- try_send_confirmation_email(user) do
@@ -425,24 +431,6 @@ defmodule Pleroma.User do
     Enum.member?(follower.following, followed.follower_address)
   end
 
-  def follow_import(%User{} = follower, followed_identifiers)
-      when is_list(followed_identifiers) do
-    Enum.map(
-      followed_identifiers,
-      fn followed_identifier ->
-        with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier),
-             {:ok, follower} <- maybe_direct_follow(follower, followed),
-             {:ok, _} <- ActivityPub.follow(follower, followed) do
-          followed
-        else
-          err ->
-            Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}")
-            err
-        end
-      end
-    )
-  end
-
   def locked?(%User{} = user) do
     user.info.locked || false
   end
@@ -564,8 +552,7 @@ defmodule Pleroma.User do
         with [_nick, _domain] <- String.split(nickname, "@"),
              {:ok, user} <- fetch_by_nickname(nickname) do
           if Pleroma.Config.get([:fetch_initial_posts, :enabled]) do
-            # TODO turn into job
-            {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user])
+            fetch_initial_posts(user)
           end
 
           {:ok, user}
@@ -576,15 +563,8 @@ defmodule Pleroma.User do
   end
 
   @doc "Fetch some posts when the user has just been federated with"
-  def fetch_initial_posts(user) do
-    pages = Pleroma.Config.get!([:fetch_initial_posts, :pages])
-
-    Enum.each(
-      # Insert all the posts in reverse order, so they're in the right order on the timeline
-      Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)),
-      &Pleroma.Web.Federator.incoming_ap_doc/1
-    )
-  end
+  def fetch_initial_posts(user),
+    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
 
   @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
   def get_followers_query(%User{} = user, nil) do
@@ -866,23 +846,6 @@ defmodule Pleroma.User do
     |> restrict_deactivated()
   end
 
-  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
-    Enum.map(
-      blocked_identifiers,
-      fn blocked_identifier ->
-        with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier),
-             {:ok, blocker} <- block(blocker, blocked),
-             {:ok, _} <- ActivityPub.block(blocker, blocked) do
-          blocked
-        else
-          err ->
-            Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}")
-            err
-        end
-      end
-    )
-  end
-
   def mute(muter, %User{ap_id: ap_id}) do
     info_cng =
       muter.info
@@ -1057,8 +1020,6 @@ defmodule Pleroma.User do
     PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
   end
 
-  def perform(:deactivate_async, user, status), do: deactivate(user, status)
-
   def deactivate(%User{} = user, status \\ true) do
     info_cng = User.Info.set_activation_status(user.info, status)
 
@@ -1104,6 +1065,75 @@ defmodule Pleroma.User do
     delete_user_activities(user)
   end
 
+  @spec perform(atom(), User.t()) :: {:ok, User.t()}
+  def perform(:fetch_initial_posts, %User{} = user) do
+    pages = Pleroma.Config.get!([:fetch_initial_posts, :pages])
+
+    Enum.each(
+      # Insert all the posts in reverse order, so they're in the right order on the timeline
+      Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)),
+      &Pleroma.Web.Federator.incoming_ap_doc/1
+    )
+
+    {:ok, user}
+  end
+
+  def perform(:deactivate_async, user, status), do: deactivate(user, status)
+
+  @spec perform(atom(), User.t(), list()) :: list() | {:error, any()}
+  def perform(:blocks_import, %User{} = blocker, blocked_identifiers)
+      when is_list(blocked_identifiers) do
+    Enum.map(
+      blocked_identifiers,
+      fn blocked_identifier ->
+        with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier),
+             {:ok, blocker} <- block(blocker, blocked),
+             {:ok, _} <- ActivityPub.block(blocker, blocked) do
+          blocked
+        else
+          err ->
+            Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}")
+            err
+        end
+      end
+    )
+  end
+
+  @spec perform(atom(), User.t(), list()) :: list() | {:error, any()}
+  def perform(:follow_import, %User{} = follower, followed_identifiers)
+      when is_list(followed_identifiers) do
+    Enum.map(
+      followed_identifiers,
+      fn followed_identifier ->
+        with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier),
+             {:ok, follower} <- maybe_direct_follow(follower, followed),
+             {:ok, _} <- ActivityPub.follow(follower, followed) do
+          followed
+        else
+          err ->
+            Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}")
+            err
+        end
+      end
+    )
+  end
+
+  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
+    do:
+      PleromaJobQueue.enqueue(:background, __MODULE__, [
+        :blocks_import,
+        blocker,
+        blocked_identifiers
+      ])
+
+  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
+    do:
+      PleromaJobQueue.enqueue(:background, __MODULE__, [
+        :follow_import,
+        follower,
+        followed_identifiers
+      ])
+
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     stream =
       ap_id
@@ -1157,8 +1187,8 @@ defmodule Pleroma.User do
       resp = fetch_by_ap_id(ap_id)
 
       if should_fetch_initial do
-        with {:ok, %User{} = user} = resp do
-          {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user])
+        with {:ok, %User{} = user} <- resp do
+          fetch_initial_posts(user)
         end
       end