[#3213] HashtagsTableMigrator state management refactoring & improvements (proper...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Tue, 16 Feb 2021 20:14:15 +0000 (23:14 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Tue, 16 Feb 2021 20:14:15 +0000 (23:14 +0300)
lib/pleroma/data_migration.ex
lib/pleroma/migrators/hashtags_table_migrator.ex
lib/pleroma/migrators/hashtags_table_migrator/state.ex

index 64fa155fff0ba4c9eefdac2a25fa9788eacd3417..1377af16e6dccc1788c7db4b307cda0e07775c62 100644 (file)
@@ -10,6 +10,7 @@ defmodule Pleroma.DataMigration do
   alias Pleroma.Repo
 
   import Ecto.Changeset
+  import Ecto.Query
 
   schema "data_migrations" do
     field(:name, :string)
@@ -28,14 +29,12 @@ defmodule Pleroma.DataMigration do
     |> unique_constraint(:name)
   end
 
-  def update(data_migration, params \\ %{}) do
-    data_migration
-    |> changeset(params)
-    |> Repo.update()
-  end
-
-  def update_state(data_migration, new_state) do
-    update(data_migration, %{state: new_state})
+  def update_one_by_id(id, params \\ %{}) do
+    with {1, _} <-
+           from(dm in DataMigration, where: dm.id == ^id)
+           |> Repo.update_all(set: params) do
+      :ok
+    end
   end
 
   def get_by_name(name) do
index 432c3401a1b4ccc72037fdb40658a591a9d0bc49..a226d9d29a3dfb854478b9862f1e5ece1e25581b 100644 (file)
@@ -11,16 +11,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
 
   alias __MODULE__.State
   alias Pleroma.Config
-  alias Pleroma.DataMigration
   alias Pleroma.Hashtag
   alias Pleroma.Object
   alias Pleroma.Repo
 
-  defdelegate state(), to: State, as: :get
-  defdelegate put_stat(key, value), to: State, as: :put
-  defdelegate increment_stat(key, increment), to: State, as: :increment
+  defdelegate data_migration(), to: State
 
-  defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+  defdelegate state(), to: State
+  defdelegate get_stat(key, value), to: State, as: :get_data_key
+  defdelegate put_stat(key, value), to: State, as: :put_data_key
+  defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
 
   @reg_name {:global, __MODULE__}
 
@@ -45,7 +45,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   def handle_continue(:init_state, _state) do
     {:ok, _} = State.start_link(nil)
 
-    update_status(:init)
+    update_status(:pending)
 
     data_migration = data_migration()
     manual_migrations = Config.get([:instance, :manual_data_migrations], [])
@@ -55,13 +55,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
         update_status(:noop)
 
       is_nil(data_migration) ->
-        update_status(:halt, "Data migration does not exist.")
+        update_status(:failed, "Data migration does not exist.")
 
       data_migration.state == :manual or data_migration.name in manual_migrations ->
-        update_status(:noop, "Data migration is in manual execution state.")
+        update_status(:manual, "Data migration is in manual execution state.")
 
       data_migration.state == :complete ->
-        handle_success(data_migration)
+        on_complete(data_migration)
 
       true ->
         send(self(), :migrate_hashtags)
@@ -72,20 +72,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
 
   @impl true
   def handle_info(:migrate_hashtags, state) do
-    State.clear()
+    State.reinit()
 
     update_status(:running)
     put_stat(:started_at, NaiveDateTime.utc_now())
 
-    data_migration = data_migration()
-    persistent_data = Map.take(data_migration.data, ["max_processed_id"])
-
-    {:ok, data_migration} =
-      DataMigration.update(data_migration, %{state: :running, data: persistent_data})
-
-    Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
+    %{id: data_migration_id} = data_migration()
+    max_processed_id = get_stat(:max_processed_id, 0)
 
-    max_processed_id = data_migration.data["max_processed_id"] || 0
+    Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
 
     query()
     |> where([object], object.id > ^max_processed_id)
@@ -104,7 +99,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
           Repo.query(
             "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
               "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
-            [data_migration.id, failed_id]
+            [data_migration_id, failed_id]
           )
       end
 
@@ -112,7 +107,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
         Repo.query(
           "DELETE FROM data_migration_failed_ids " <>
             "WHERE data_migration_id = $1 AND record_id = ANY($2)",
-          [data_migration.id, object_ids -- failed_ids]
+          [data_migration_id, object_ids -- failed_ids]
         )
 
       max_object_id = Enum.at(object_ids, -1)
@@ -120,14 +115,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
       put_stat(:max_processed_id, max_object_id)
       increment_stat(:processed_count, length(object_ids))
       increment_stat(:failed_count, length(failed_ids))
-
-      put_stat(
-        :records_per_second,
-        state()[:processed_count] /
-          Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
-      )
-
-      persist_stats(data_migration)
+      put_stat(:records_per_second, records_per_second())
+      _ = State.persist_to_db()
 
       # A quick and dirty approach to controlling the load this background migration imposes
       sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
@@ -135,22 +124,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     end)
     |> Stream.run()
 
-    with 0 <- failures_count(data_migration.id) do
+    with 0 <- failures_count(data_migration_id) do
       _ = delete_non_create_activities_hashtags()
-
-      {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
-
-      handle_success(data_migration)
+      set_complete()
     else
       _ ->
-        _ = DataMigration.update_state(data_migration, :failed)
-
         update_status(:failed, "Please check data_migration_failed_ids records.")
     end
 
     {:noreply, state}
   end
 
+  defp records_per_second do
+    get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
+  end
+
+  defp running_time do
+    NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
+  end
+
   @hashtags_objects_cleanup_query """
   DELETE FROM hashtags_objects WHERE object_id IN
     (SELECT DISTINCT objects.id FROM objects
@@ -169,6 +161,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
       WHERE hashtags_objects.hashtag_id IS NULL);
   """
 
+  @doc """
+  Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
+  Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
+  """
   def delete_non_create_activities_hashtags do
     {:ok, %{num_rows: hashtags_objects_count}} =
       Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
@@ -256,14 +252,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     end
   end
 
-  defp persist_stats(data_migration) do
-    runner_state = Map.drop(state(), [:status])
-    _ = DataMigration.update(data_migration, %{data: runner_state})
-  end
-
-  defp handle_success(data_migration) do
-    update_status(:complete)
-
+  defp on_complete(data_migration) do
     cond do
       data_migration.feature_lock ->
         :noop
@@ -321,18 +310,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   end
 
   def force_restart do
-    {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
+    :ok = State.reset()
     force_continue()
   end
 
-  def force_complete do
-    {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete)
-
-    handle_success(data_migration)
+  def set_complete do
+    update_status(:complete)
+    _ = State.persist_to_db()
+    on_complete(data_migration())
   end
 
   defp update_status(status, message \\ nil) do
-    put_stat(:status, status)
+    put_stat(:state, status)
     put_stat(:message, message)
   end
 end
index 901563426876dfb52923a1fe6eeab7e5760dc684..ed9848824b39c475dbbdb5e86df89725c976c76e 100644 (file)
@@ -5,31 +5,98 @@
 defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
   use Agent
 
-  @init_state %{}
+  alias Pleroma.DataMigration
+
+  defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+
   @reg_name {:global, __MODULE__}
 
   def start_link(_) do
-    Agent.start_link(fn -> @init_state end, name: @reg_name)
+    Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
+  end
+
+  defp load_state_from_db do
+    data_migration = data_migration()
+
+    data =
+      if data_migration do
+        Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end)
+      else
+        %{}
+      end
+
+    %{
+      data_migration_id: data_migration && data_migration.id,
+      data: data
+    }
   end
 
-  def clear do
-    Agent.update(@reg_name, fn _state -> @init_state end)
+  def persist_to_db do
+    %{data_migration_id: data_migration_id, data: data} = state()
+
+    if data_migration_id do
+      DataMigration.update_one_by_id(data_migration_id, data: data)
+    else
+      {:error, :nil_data_migration_id}
+    end
+  end
+
+  def reset do
+    %{data_migration_id: data_migration_id} = state()
+
+    with false <- is_nil(data_migration_id),
+         :ok <-
+           DataMigration.update_one_by_id(data_migration_id,
+             state: :pending,
+             data: %{}
+           ) do
+      reinit()
+    else
+      true -> {:error, :nil_data_migration_id}
+      e -> e
+    end
   end
 
-  def get do
+  def reinit do
+    Agent.update(@reg_name, fn _state -> load_state_from_db() end)
+  end
+
+  def state do
     Agent.get(@reg_name, & &1)
   end
 
-  def put(key, value) do
+  def get_data_key(key, default \\ nil) do
+    get_in(state(), [:data, key]) || default
+  end
+
+  def put_data_key(key, value) do
+    _ = persist_non_data_change(key, value)
+
     Agent.update(@reg_name, fn state ->
-      Map.put(state, key, value)
+      put_in(state, [:data, key], value)
     end)
   end
 
-  def increment(key, increment \\ 1) do
+  def increment_data_key(key, increment \\ 1) do
     Agent.update(@reg_name, fn state ->
-      updated_value = (state[key] || 0) + increment
-      Map.put(state, key, updated_value)
+      initial_value = get_in(state, [:data, key]) || 0
+      updated_value = initial_value + increment
+      put_in(state, [:data, key], updated_value)
     end)
   end
+
+  defp persist_non_data_change(:state, value) do
+    with true <- get_data_key(:state) != value,
+         true <- value in Pleroma.DataMigration.State.__valid_values__(),
+         %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
+      DataMigration.update_one_by_id(data_migration_id, state: value)
+    else
+      false -> :ok
+      _ -> {:error, :nil_data_migration_id}
+    end
+  end
+
+  defp persist_non_data_change(_, _) do
+    nil
+  end
 end