X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fmigrators%2Fhashtags_table_migrator.ex;h=07bb9aeb292f2f8de1e82d31dc50443d5912746b;hb=6531eddf361fa52db3906ab011a4e33c7a5f9552;hp=8ad2c8c73a8930c2cc7e50b43b58c19a613032af;hpb=218c51960606454577a12fb6e417d66ab46f3965;p=akkoma diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 8ad2c8c73..07bb9aeb2 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -11,21 +11,26 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias __MODULE__.State alias Pleroma.Config - alias Pleroma.DataMigration alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Repo - defdelegate state(), to: State, as: :get - defdelegate put_stat(key, value), to: State, as: :put - defdelegate increment_stat(key, increment), to: State, as: :increment + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + defdelegate data_migration_id(), to: State - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + defdelegate state(), to: State + defdelegate persist_state(), to: State, as: :persist_to_db + defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + @feature_config_path [:database, :improved_hashtag_timeline] @reg_name {:global, __MODULE__} def whereis, do: GenServer.whereis(@reg_name) + def feature_state, do: Config.get(@feature_config_path) + def start_link(_) do case whereis() do nil -> @@ -45,8 +50,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - update_status(:init) - data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) @@ -55,13 +58,17 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do update_status(:noop) is_nil(data_migration) -> - update_status(:halt, "Data migration does not exist.") + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:noop, "Data migration is in manual execution state.") + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") data_migration.state == :complete -> - handle_success(data_migration) + on_complete(data_migration) true -> send(self(), :migrate_hashtags) @@ -72,19 +79,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @impl true def handle_info(:migrate_hashtags, state) do - data_migration = data_migration() - - persistent_data = Map.take(data_migration.data, ["max_processed_id"]) - - {:ok, data_migration} = - DataMigration.update(data_migration, %{state: :running, data: persistent_data}) + State.reinit() update_status(:running) + put_stat(:iteration_processed_count, 0) put_stat(:started_at, NaiveDateTime.utc_now()) - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + data_migration_id = data_migration_id() + max_processed_id = get_stat(:max_processed_id, 0) - max_processed_id = data_migration.data["max_processed_id"] || 0 + Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") query() |> where([object], object.id > ^max_processed_id) @@ -92,18 +96,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) + results = Enum.map(objects, &transfer_object_hashtags(&1)) + failed_ids = - objects - |> Enum.map(&transfer_object_hashtags(&1)) + results |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) + # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags) + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + for failed_id <- failed_ids do _ = Repo.query( "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> "VALUES ($1, $2) ON CONFLICT DO NOTHING;", - [data_migration.id, failed_id] + [data_migration_id, failed_id] ) end @@ -111,22 +122,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = ANY($2)", - [data_migration.id, object_ids -- failed_ids] + [data_migration_id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) - - put_stat( - :records_per_second, - state()[:processed_count] / - Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) - ) - - persist_stats(data_migration) + increment_stat(:affected_count, chunk_affected_count) + put_stat(:records_per_second, records_per_second()) + persist_state() # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -134,26 +141,87 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with 0 <- failures_count(data_migration.id) do - {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) - handle_success(data_migration) - else - _ -> - _ = DataMigration.update_state(data_migration, :failed) + cond do + fault_rate == 0 -> + set_complete() - update_status(:failed, "Please check data_migration_failed_ids records.") + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Check `retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) end + persist_state() {:noreply, state} end + def fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + + defp records_per_second do + get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) + end + + @hashtags_objects_cleanup_query """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + @hashtags_cleanup_query """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ + + @doc """ + Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} + end + defp query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up from( object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: is_nil(hashtag.id), where: fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), select: %{ @@ -161,33 +229,44 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do tag: fragment("(?)->'tag'", object.data) } ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) end + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} defp transfer_object_hashtags(object) do - embedded_tags = (Map.has_key?(object, :tag) && object.tag) || object.data["tag"] + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) + else + {:noop, object.id} + end + end + + defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - for hashtag_record <- hashtag_records do - with {:ok, _} <- - Repo.query( - "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", - [hashtag_record.id, object.id] - ) do - nil - else - {:error, e} -> - error = - "ERROR: could not link object #{object.id} and hashtag " <> - "#{hashtag_record.id}: #{inspect(e)}" + maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" - Logger.error(error) + try do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do + object.id + else + e -> + Logger.error("#{base_error}: #{inspect(e)}") Repo.rollback(object.id) end + rescue + e -> + Logger.error("#{base_error}: #{inspect(e)}") + Repo.rollback(object.id) end - - object.id else e -> error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" @@ -199,38 +278,33 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @doc "Approximate count for current iteration (including processed records count)" def count(force \\ false, timeout \\ :infinity) do - stored_count = state()[:count] + stored_count = get_stat(:count) if stored_count && !force do stored_count else - processed_count = state()[:processed_count] || 0 - max_processed_id = data_migration().data["max_processed_id"] || 0 + processed_count = get_stat(:processed_count, 0) + max_processed_id = get_stat(:max_processed_id, 0) query = where(query(), [object], object.id > ^max_processed_id) count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count put_stat(:count, count) + persist_state() + count end end - defp persist_stats(data_migration) do - runner_state = Map.drop(state(), [:status]) - _ = DataMigration.update(data_migration, %{data: runner_state}) - end - - defp handle_success(data_migration) do - update_status(:complete) - + defp on_complete(data_migration) do cond do data_migration.feature_lock -> :noop - not is_nil(Config.improved_hashtag_timeline()) -> + not is_nil(feature_state()) -> :noop true -> - Config.put(Config.improved_hashtag_timeline_path(), true) + Config.put(@feature_config_path, true) :ok end end @@ -240,38 +314,41 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), on: dmf.record_id == o.id ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) |> order_by([o], asc: o.id) end - def failures_count(data_migration_id \\ nil) do - data_migration_id = data_migration_id || data_migration().id - + def failures_count do with {:ok, %{rows: [[count]]}} <- Repo.query( "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id] + [data_migration_id()] ) do count end end def retry_failed do - data_migration = data_migration() + data_migration_id = data_migration_id() failed_objects_query() |> Repo.chunk_stream(100, :one) |> Stream.each(fn object -> - with {:ok, _} <- transfer_object_hashtags(object) do + with {res, _} when res != :error <- transfer_object_hashtags(object) do _ = Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = $2", - [data_migration.id, object.id] + [data_migration_id, object.id] ) end end) |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() end def force_continue do @@ -279,18 +356,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end def force_restart do - {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + :ok = State.reset() force_continue() end - def force_complete do - {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) - - handle_success(data_migration) + def set_complete do + update_status(:complete) + persist_state() + on_complete(data_migration()) end defp update_status(status, message \\ nil) do - put_stat(:status, status) + put_stat(:state, status) put_stat(:message, message) end end