[#3213] Improved `database.transfer_hashtags` mix task: proper rollback, speedup.
authorIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 30 Dec 2020 11:35:19 +0000 (14:35 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 30 Dec 2020 11:35:19 +0000 (14:35 +0300)
lib/mix/tasks/pleroma/database.ex

index 093c7dd30de565c61fafcdf5839497daafa33820..d44bd34784b24b2c2bbd9d727c4464fb6d7fac9d 100644 (file)
@@ -137,6 +137,8 @@ defmodule Mix.Tasks.Pleroma.Database do
 
     start_pleroma()
 
+    Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
+
     from(
       object in Object,
       left_join: hashtag in assoc(object, :hashtags),
@@ -144,21 +146,12 @@ defmodule Mix.Tasks.Pleroma.Database do
       where: fragment("(?)->>'tag' != '[]'", object.data),
       select: %{
         id: object.id,
-        inserted_at: object.inserted_at,
         tag: fragment("(?)->>'tag'", object.data)
-      },
-      order_by: [desc: object.id]
+      }
     )
     |> Pleroma.Repo.chunk_stream(100, :batches)
     |> Stream.each(fn objects ->
-      chunk_start = List.first(objects)
-      chunk_end = List.last(objects)
-
-      Logger.info(
-        "transfer_hashtags: " <>
-          "#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <>
-          "#{chunk_end.id} (#{chunk_end.inserted_at})"
-      )
+      Logger.info("Processing #{length(objects)} objects...")
 
       Enum.map(
         objects,
@@ -168,28 +161,39 @@ defmodule Mix.Tasks.Pleroma.Database do
             |> Jason.decode!()
             |> Enum.filter(&is_bitstring(&1))
 
-          with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
-            Repo.transaction(fn ->
+          Repo.transaction(fn ->
+            with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
               for hashtag_record <- hashtag_records do
-                with {:error, _} <-
+                with {:ok, _} <-
                        Ecto.Adapters.SQL.query(
                          Repo,
                          "insert into hashtags_objects(hashtag_id, object_id) values " <>
                            "(#{hashtag_record.id}, #{object.id});"
                        ) do
-                  Logger.warn(
-                    "ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}"
-                  )
+                  :noop
+                else
+                  {:error, e} ->
+                    error =
+                      "ERROR: could not link object #{object.id} and hashtag " <>
+                        "#{hashtag_record.id}: #{inspect(e)}"
+
+                    Logger.error(error)
+                    Repo.rollback(error)
                 end
               end
-            end)
-          else
-            e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}")
-          end
+            else
+              e ->
+                error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
+                Logger.error(error)
+                Repo.rollback(error)
+            end
+          end)
         end
       )
     end)
     |> Stream.run()
+
+    Logger.info("Done transferring hashtags. Please check logs to ensure no errors.")
   end
 
   def run(["vacuum", args]) do