[#3213] HashtagsTableMigrator: fault rate allowance to enable the feature (defaults...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 18 Feb 2021 17:40:10 +0000 (20:40 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 18 Feb 2021 17:40:10 +0000 (20:40 +0300)
config/config.exs
config/description.exs
docs/configuration/cheatsheet.md
lib/pleroma/migrators/hashtags_table_migrator.ex
lib/pleroma/migrators/hashtags_table_migrator/state.ex

index 0fbca06f3f8f8d2cd35c0cdef8a39c0fb378efea..c371c397ce966bf46f7f498f03a741d04996c82a 100644 (file)
@@ -657,6 +657,8 @@ config :pleroma, :oauth2,
 
 config :pleroma, :database, rum_enabled: false
 
+config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01
+
 config :pleroma, :env, Mix.env()
 
 config :http_signatures,
index 29fc5fbd48d88cdaa1c26162cbf56c9b2abe9d40..6ffc712784989571a38bc6df9a2e18d74a230afc 100644 (file)
@@ -479,6 +479,13 @@ config :pleroma, :config_description, [
     type: :group,
     description: "`populate_hashtags_table` background migration settings",
     children: [
+      %{
+        key: :fault_rate_allowance,
+        type: :float,
+        description:
+          "Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).",
+        suggestions: [0.01]
+      },
       %{
         key: :sleep_interval_ms,
         type: :integer,
index 68a5a3c7fcc7229cbc75dd00dca28be5bb605c59..6a1031f15077869b78893c656d43f1f451c072c4 100644 (file)
@@ -70,6 +70,7 @@ To add configuration to your config file, you can copy it from the base config.
 
 ## Background migrations
 * `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances).
+* `populate_hashtags_table/fault_rate_allowance`: Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).
 
 ## Welcome
 * `direct_message`: - welcome message sent as a direct message.
index ac17f91ccc2dd34d2de4d2fab5aea3fd47b4f14b..45dab8470fca76f827716094f1b157cfa9d563c3 100644 (file)
@@ -15,7 +15,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   alias Pleroma.Object
   alias Pleroma.Repo
 
-  defdelegate data_migration(), to: State
+  defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
+  defdelegate data_migration_id(), to: State
 
   defdelegate state(), to: State
   defdelegate persist_state(), to: State, as: :persist_to_db
@@ -23,10 +24,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   defdelegate put_stat(key, value), to: State, as: :put_data_key
   defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
 
+  @feature_config_path [:database, :improved_hashtag_timeline]
   @reg_name {:global, __MODULE__}
 
   def whereis, do: GenServer.whereis(@reg_name)
 
+  def feature_state, do: Config.get(@feature_config_path)
+
   def start_link(_) do
     case whereis() do
       nil ->
@@ -46,8 +50,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   def handle_continue(:init_state, _state) do
     {:ok, _} = State.start_link(nil)
 
-    update_status(:pending)
-
     data_migration = data_migration()
     manual_migrations = Config.get([:instance, :manual_data_migrations], [])
 
@@ -56,10 +58,14 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
         update_status(:noop)
 
       is_nil(data_migration) ->
-        update_status(:failed, "Data migration does not exist.")
+        message = "Data migration does not exist."
+        update_status(:failed, message)
+        Logger.error("#{__MODULE__}: #{message}")
 
       data_migration.state == :manual or data_migration.name in manual_migrations ->
-        update_status(:manual, "Data migration is in manual execution state.")
+        message = "Data migration is in manual execution or manual fix mode."
+        update_status(:manual, message)
+        Logger.warn("#{__MODULE__}: #{message}")
 
       data_migration.state == :complete ->
         on_complete(data_migration)
@@ -78,7 +84,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     update_status(:running)
     put_stat(:started_at, NaiveDateTime.utc_now())
 
-    %{id: data_migration_id} = data_migration()
+    data_migration_id = data_migration_id()
     max_processed_id = get_stat(:max_processed_id, 0)
 
     Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
@@ -89,12 +95,19 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> Stream.each(fn objects ->
       object_ids = Enum.map(objects, & &1.id)
 
+      results = Enum.map(objects, &transfer_object_hashtags(&1))
+
       failed_ids =
-        objects
-        |> Enum.map(&transfer_object_hashtags(&1))
+        results
         |> Enum.filter(&(elem(&1, 0) == :error))
         |> Enum.map(&elem(&1, 1))
 
+      # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
+      chunk_affected_count =
+        results
+        |> Enum.filter(&(elem(&1, 0) == :ok))
+        |> length()
+
       for failed_id <- failed_ids do
         _ =
           Repo.query(
@@ -116,6 +129,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
       put_stat(:max_processed_id, max_object_id)
       increment_stat(:processed_count, length(object_ids))
       increment_stat(:failed_count, length(failed_ids))
+      increment_stat(:affected_count, chunk_affected_count)
       put_stat(:records_per_second, records_per_second())
       persist_state()
 
@@ -125,17 +139,42 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     end)
     |> Stream.run()
 
-    with 0 <- failures_count(data_migration_id) do
-      _ = delete_non_create_activities_hashtags()
-      set_complete()
-    else
-      _ ->
-        update_status(:failed, "Please check data_migration_failed_ids records.")
+    fault_rate = fault_rate()
+    put_stat(:fault_rate, fault_rate)
+    fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
+
+    cond do
+      fault_rate == 0 ->
+        set_complete()
+
+      is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
+        message = """
+        Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
+        Putting data migration to manual fix mode. Check `retry_failed/0`.
+        """
+
+        Logger.warn("#{__MODULE__}: #{message}")
+        update_status(:manual, message)
+        on_complete(data_migration())
+
+      true ->
+        message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
+        Logger.error("#{__MODULE__}: #{message}")
+        update_status(:failed, message)
     end
 
+    persist_state()
     {:noreply, state}
   end
 
+  def fault_rate do
+    with failures_count when is_integer(failures_count) <- failures_count() do
+      failures_count / Enum.max([get_stat(:affected_count, 0), 1])
+    else
+      _ -> :error
+    end
+  end
+
   defp records_per_second do
     get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
   end
@@ -194,6 +233,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id))
   end
 
+  @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()}
   defp transfer_object_hashtags(object) do
     embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"]
     hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
@@ -201,7 +241,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     if Enum.any?(hashtags) do
       transfer_object_hashtags(object, hashtags)
     else
-      {:ok, object.id}
+      {:noop, object.id}
     end
   end
 
@@ -209,13 +249,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     Repo.transaction(fn ->
       with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
         maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
-        expected_rows = length(hashtag_records)
-
-        base_error =
-          "ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}"
+        base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}"
 
         try do
-          with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
+          with {rows_count, _} when is_integer(rows_count) <-
+                 Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do
             object.id
           else
             e ->
@@ -260,11 +298,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
       data_migration.feature_lock ->
         :noop
 
-      not is_nil(Config.get([:database, :improved_hashtag_timeline])) ->
+      not is_nil(feature_state()) ->
         :noop
 
       true ->
-        Config.put([:database, :improved_hashtag_timeline], true)
+        Config.put(@feature_config_path, true)
         :ok
     end
   end
@@ -274,38 +312,41 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
       on: dmf.record_id == o.id
     )
-    |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
+    |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
     |> order_by([o], asc: o.id)
   end
 
-  def failures_count(data_migration_id \\ nil) do
-    data_migration_id = data_migration_id || data_migration().id
-
+  def failures_count do
     with {:ok, %{rows: [[count]]}} <-
            Repo.query(
              "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
-             [data_migration_id]
+             [data_migration_id()]
            ) do
       count
     end
   end
 
   def retry_failed do
-    data_migration = data_migration()
+    data_migration_id = data_migration_id()
 
     failed_objects_query()
     |> Repo.chunk_stream(100, :one)
     |> Stream.each(fn object ->
-      with {:ok, _} <- transfer_object_hashtags(object) do
+      with {res, _} when res != :error <- transfer_object_hashtags(object) do
         _ =
           Repo.query(
             "DELETE FROM data_migration_failed_ids " <>
               "WHERE data_migration_id = $1 AND record_id = $2",
-            [data_migration.id, object.id]
+            [data_migration_id, object.id]
           )
       end
     end)
     |> Stream.run()
+
+    put_stat(:failed_count, failures_count())
+    persist_state()
+
+    force_continue()
   end
 
   def force_continue do
index ed9848824b39c475dbbdb5e86df89725c976c76e..ee0009b2e26bf74353c30df9e5a77928f40e4e3a 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
 
   alias Pleroma.DataMigration
 
-  defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+  defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator
 
   @reg_name {:global, __MODULE__}
 
@@ -99,4 +99,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
   defp persist_non_data_change(_, _) do
     nil
   end
+
+  def data_migration_id, do: Map.get(state(), :data_migration_id)
 end