X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fmigrators%2Fhashtags_table_migrator.ex;h=9a036e0b298506d8d0652184a1cad2c1888c6a0a;hb=d1c6dd97aa503ca7c897d67d98fe8c924e113a61;hp=a7e3de542c78744191437807bc045c3854b1e5cf;hpb=3e4d84729a4ca8d9779d439a9aa2c8c23b3acd1d;p=akkoma diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index a7e3de542..9a036e0b2 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,39 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - defmodule State do - use Agent - - @init_state %{} - - def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) - end - - def get do - Agent.get(__MODULE__, & &1) - end - - def put(key, value) do - Agent.update(__MODULE__, fn state -> - Map.put(state, key, value) - end) - end - - def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) - end) - end - end - use GenServer require Logger import Ecto.Query + alias __MODULE__.State alias Pleroma.Config alias Pleroma.DataMigration alias Pleroma.Hashtag @@ -43,13 +17,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias Pleroma.Repo defdelegate state(), to: State, as: :get - defdelegate put_state(key, value), to: State, as: :put - defdelegate increment_state(key, increment), to: State, as: :increment + defdelegate put_stat(key, value), to: State, as: :put + defdelegate increment_stat(key, increment), to: State, as: :increment defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end end @impl true @@ -61,24 +45,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - put_state(:status, :init) + update_status(:init) - dm = data_migration() + data_migration = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) cond do Config.get(:env) == :test -> - put_state(:status, :noop) + update_status(:noop) - is_nil(dm) -> - put_state(:status, :halt) - put_state(:message, "Data migration does not exist.") + is_nil(data_migration) -> + update_status(:halt, "Data migration does not exist.") - dm.state == :manual -> - put_state(:status, :noop) - put_state(:message, "Data migration is in manual execution state.") + data_migration.state == :manual or data_migration.name in manual_migrations -> + update_status(:noop, "Data migration is in manual execution state.") - dm.state == :complete -> - handle_success() + data_migration.state == :complete -> + handle_success(data_migration) true -> send(self(), :migrate_hashtags) @@ -89,28 +72,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @impl true def handle_info(:migrate_hashtags, state) do + State.clear() + data_migration = data_migration() - {:ok, data_migration} = DataMigration.update_state(data_migration, :running) - put_state(:status, :running) + persistent_data = Map.take(data_migration.data, ["max_processed_id"]) + + {:ok, data_migration} = + DataMigration.update(data_migration, %{state: :running, data: persistent_data}) + + update_status(:running) + put_stat(:started_at, NaiveDateTime.utc_now()) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") max_processed_id = data_migration.data["max_processed_id"] || 0 - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: object.id > ^max_processed_id, - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) + query() + |> where([object], object.id > ^max_processed_id) |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) @@ -132,15 +111,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do _ = Repo.query( - "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", - [object_ids -- failed_ids] + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration.id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) - _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}}) - increment_state(:processed_count, length(object_ids)) - increment_state(:failed_count, length(failed_ids)) + put_stat(:max_processed_id, max_object_id) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + + put_stat( + :records_per_second, + state()[:processed_count] / + Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) + ) + + persist_stats(data_migration) # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -148,27 +136,50 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with {:ok, %{rows: [[0]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration.id] - ) do - put_state(:status, :complete) - _ = DataMigration.update_state(data_migration, :complete) + with 0 <- failures_count(data_migration.id) do + {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) - handle_success() + handle_success(data_migration) else _ -> - put_state(:status, :failed) - put_state(:message, "Please check data_migration_failed_ids records.") + _ = DataMigration.update_state(data_migration, :failed) + + update_status(:failed, "Please check data_migration_failed_ids records.") end {:noreply, state} end + defp query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later + from( + object in Object, + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) + end + defp transfer_object_hashtags(object) do - hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] + hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) + + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) + else + {:ok, object.id} + end + end + defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do @@ -199,13 +210,100 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end - defp handle_success do - put_state(:status, :complete) + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = state()[:count] + + if stored_count && !force do + stored_count + else + processed_count = state()[:processed_count] || 0 + max_processed_id = data_migration().data["max_processed_id"] || 0 + query = where(query(), [object], object.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + count + end + end + + defp persist_stats(data_migration) do + runner_state = Map.drop(state(), [:status]) + _ = DataMigration.update(data_migration, %{data: runner_state}) + end + + defp handle_success(data_migration) do + update_status(:complete) + + cond do + data_migration.feature_lock -> + :noop + + not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> + :noop + + true -> + Config.put([:database, :improved_hashtag_timeline], true) + :ok + end + end + + def failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> order_by([o], asc: o.id) + end + + def failures_count(data_migration_id \\ nil) do + data_migration_id = data_migration_id || data_migration().id - unless Config.improved_hashtag_timeline() do - Config.put(Config.improved_hashtag_timeline_path(), true) + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id] + ) do + count end + end + + def retry_failed do + data_migration = data_migration() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {:ok, _} <- transfer_object_hashtags(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration.id, object.id] + ) + end + end) + |> Stream.run() + end + + def force_continue do + send(whereis(), :migrate_hashtags) + end + + def force_restart do + {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + force_continue() + end + + def force_complete do + {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) + + handle_success(data_migration) + end - :ok + defp update_status(status, message \\ nil) do + put_stat(:status, status) + put_stat(:message, message) end end