[#3213] Reorganized hashtags cleanup. Transaction-wrapped Hashtag.get_or_create_by_na...
[akkoma] / lib / pleroma / migrators / hashtags_table_migrator.ex
index 9a036e0b298506d8d0652184a1cad2c1888c6a0a..c53f6be127188104a1d8a510186a98bcbf317f9c 100644 (file)
@@ -74,16 +74,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   def handle_info(:migrate_hashtags, state) do
     State.clear()
 
-    data_migration = data_migration()
+    update_status(:running)
+    put_stat(:started_at, NaiveDateTime.utc_now())
 
+    data_migration = data_migration()
     persistent_data = Map.take(data_migration.data, ["max_processed_id"])
 
     {:ok, data_migration} =
       DataMigration.update(data_migration, %{state: :running, data: persistent_data})
 
-    update_status(:running)
-    put_stat(:started_at, NaiveDateTime.utc_now())
-
     Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
 
     max_processed_id = data_migration.data["max_processed_id"] || 0
@@ -137,6 +136,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> Stream.run()
 
     with 0 <- failures_count(data_migration.id) do
+      _ = delete_non_create_activities_hashtags()
+
       {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
 
       handle_success(data_migration)
@@ -150,9 +151,37 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     {:noreply, state}
   end
 
+  @hashtags_objects_cleanup_query """
+  DELETE FROM hashtags_objects WHERE object_id IN
+    (SELECT DISTINCT objects.id FROM objects
+      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+          (objects.data->>'id')
+        AND activities.data->>'type' = 'Create'
+      WHERE activities.id IS NULL);
+  """
+
+  @hashtags_cleanup_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL);
+  """
+
+  def delete_non_create_activities_hashtags do
+    {:ok, %{num_rows: hashtags_objects_count}} =
+      Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
+
+    {:ok, %{num_rows: hashtags_count}} =
+      Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
+
+    {:ok, hashtags_objects_count, hashtags_count}
+  end
+
   defp query do
     # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
-    # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+    # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
     from(
       object in Object,
       where:
@@ -182,25 +211,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   defp transfer_object_hashtags(object, hashtags) do
     Repo.transaction(fn ->
       with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
-        for hashtag_record <- hashtag_records do
-          with {:ok, _} <-
-                 Repo.query(
-                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
-                   [hashtag_record.id, object.id]
-                 ) do
-            nil
-          else
-            {:error, e} ->
-              error =
-                "ERROR: could not link object #{object.id} and hashtag " <>
-                  "#{hashtag_record.id}: #{inspect(e)}"
-
-              Logger.error(error)
-              Repo.rollback(object.id)
-          end
+        maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
+        expected_rows = length(hashtag_records)
+
+        with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
+          object.id
+        else
+          e ->
+            error =
+              "ERROR when inserting #{expected_rows} hashtags_objects " <>
+                "for object #{object.id}: #{inspect(e)}"
+
+            Logger.error(error)
+            Repo.rollback(object.id)
         end
-
-        object.id
       else
         e ->
           error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"