X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fmigrators%2Fhashtags_table_migrator.ex;h=b84058e11d189e66b4b8855a6a032a7a34f0f47a;hb=f930e83fa2a7e2184ca6bd09773d81568e7c755c;hp=ac17f91ccc2dd34d2de4d2fab5aea3fd47b4f14b;hpb=854ea1aefb5ff4e03e9e9af6e8dd50f66c61c913;p=akkoma diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index ac17f91cc..b84058e11 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,82 +3,28 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - use GenServer + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState - require Logger + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + end - import Ecto.Query + use Pleroma.Migrators.Support.BaseMigrator - alias __MODULE__.State - alias Pleroma.Config alias Pleroma.Hashtag + alias Pleroma.Migrators.Support.BaseMigrator alias Pleroma.Object - alias Pleroma.Repo - - defdelegate data_migration(), to: State - - defdelegate state(), to: State - defdelegate persist_state(), to: State, as: :persist_to_db - defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key - defdelegate put_stat(key, value), to: State, as: :put_data_key - defdelegate increment_stat(key, increment), to: State, as: :increment_data_key - - @reg_name {:global, __MODULE__} - - def whereis, do: GenServer.whereis(@reg_name) - - def start_link(_) do - case whereis() do - nil -> - GenServer.start_link(__MODULE__, nil, name: @reg_name) - - pid -> - {:ok, pid} - end - end - - @impl true - def init(_) do - {:ok, nil, {:continue, :init_state}} - end - - @impl true - def handle_continue(:init_state, _state) do - {:ok, _} = State.start_link(nil) - update_status(:pending) + @impl BaseMigrator + def feature_config_path, do: [:features, :improved_hashtag_timeline] - data_migration = data_migration() - manual_migrations = Config.get([:instance, :manual_data_migrations], []) + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) - cond do - Config.get(:env) == :test -> - update_status(:noop) - - is_nil(data_migration) -> - update_status(:failed, "Data migration does not exist.") - - data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:manual, "Data migration is in manual execution state.") - - data_migration.state == :complete -> - on_complete(data_migration) - - true -> - send(self(), :migrate_hashtags) - end - - {:noreply, nil} - end - - @impl true - def handle_info(:migrate_hashtags, state) do - State.reinit() - - update_status(:running) - put_stat(:started_at, NaiveDateTime.utc_now()) - - %{id: data_migration_id} = data_migration() + @impl BaseMigrator + def perform do + data_migration_id = data_migration_id() max_processed_id = get_stat(:max_processed_id, 0) Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") @@ -89,12 +35,19 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) + results = Enum.map(objects, &transfer_object_hashtags(&1)) + failed_ids = - objects - |> Enum.map(&transfer_object_hashtags(&1)) + results |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) + # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + for failed_id <- failed_ids do _ = Repo.query( @@ -114,8 +67,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do max_object_id = Enum.at(object_ids, -1) put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) put_stat(:records_per_second, records_per_second()) persist_state() @@ -124,59 +79,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Process.sleep(sleep_interval) end) |> Stream.run() - - with 0 <- failures_count(data_migration_id) do - _ = delete_non_create_activities_hashtags() - set_complete() - else - _ -> - update_status(:failed, "Please check data_migration_failed_ids records.") - end - - {:noreply, state} - end - - defp records_per_second do - get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) - end - - defp running_time do - NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) end - @hashtags_objects_cleanup_query """ - DELETE FROM hashtags_objects WHERE object_id IN - (SELECT DISTINCT objects.id FROM objects - JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities - ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = - (objects.data->>'id') - AND activities.data->>'type' = 'Create' - WHERE activities.id IS NULL); - """ - - @hashtags_cleanup_query """ - DELETE FROM hashtags WHERE id IN - (SELECT hashtags.id FROM hashtags - LEFT OUTER JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id - WHERE hashtags_objects.hashtag_id IS NULL); - """ - - @doc """ - Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. - Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). - """ - def delete_non_create_activities_hashtags do - {:ok, %{num_rows: hashtags_objects_count}} = - Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) - - {:ok, %{num_rows: hashtags_count}} = - Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) - - {:ok, hashtags_objects_count, hashtags_count} - end - - defp query do + @impl BaseMigrator + def query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up from( @@ -194,6 +100,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) end + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} defp transfer_object_hashtags(object) do embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) @@ -201,7 +108,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do if Enum.any?(hashtags) do transfer_object_hashtags(object, hashtags) else - {:ok, object.id} + {:noop, object.id} end end @@ -209,13 +116,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) - expected_rows = length(hashtag_records) - - base_error = - "ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}" + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" try do - with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do object.id else e -> @@ -236,95 +141,68 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end - @doc "Approximate count for current iteration (including processed records count)" - def count(force \\ false, timeout \\ :infinity) do - stored_count = get_stat(:count) - - if stored_count && !force do - stored_count - else - processed_count = get_stat(:processed_count, 0) - max_processed_id = get_stat(:max_processed_id, 0) - query = where(query(), [object], object.id > ^max_processed_id) - - count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count - put_stat(:count, count) - persist_state() - - count - end - end - - defp on_complete(data_migration) do - cond do - data_migration.feature_lock -> - :noop - - not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> - :noop - - true -> - Config.put([:database, :improved_hashtag_timeline], true) - :ok - end - end - - def failed_objects_query do - from(o in Object) - |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), - on: dmf.record_id == o.id - ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) - |> order_by([o], asc: o.id) - end - - def failures_count(data_migration_id \\ nil) do - data_migration_id = data_migration_id || data_migration().id - - with {:ok, %{rows: [[count]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id] - ) do - count - end - end - + @impl BaseMigrator def retry_failed do - data_migration = data_migration() + data_migration_id = data_migration_id() failed_objects_query() |> Repo.chunk_stream(100, :one) |> Stream.each(fn object -> - with {:ok, _} <- transfer_object_hashtags(object) do + with {res, _} when res != :error <- transfer_object_hashtags(object) do _ = Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = $2", - [data_migration.id, object.id] + [data_migration_id, object.id] ) end end) |> Stream.run() - end - def force_continue do - send(whereis(), :migrate_hashtags) - end + put_stat(:failed_count, failures_count()) + persist_state() - def force_restart do - :ok = State.reset() force_continue() end - def set_complete do - update_status(:complete) - persist_state() - on_complete(data_migration()) + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) end - defp update_status(status, message \\ nil) do - put_stat(:state, status) - put_stat(:message, message) + @doc """ + Service func to delete `hashtags_objects` for legacy objects not associated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + hashtags_objects_cleanup_query = """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + hashtags_cleanup_query = """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ + + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} end end