X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fmigrators%2Fhashtags_table_migrator.ex;h=07b42a7f4d946ede09fc0813c884c34846009ece;hb=1b49b8efe57256b3f64b4b7e8a1de805ab030814;hp=b40578d507d69f778810940f9a06f08cdeb6254d;hpb=48b399cedb7d46ea0f08181cfbe4df222861f65b;p=akkoma diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index b40578d50..07b42a7f4 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -72,6 +72,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @impl true def handle_info(:migrate_hashtags, state) do + State.clear() + data_migration = data_migration() persistent_data = Map.take(data_migration.data, ["max_processed_id"]) @@ -80,24 +82,14 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do DataMigration.update(data_migration, %{state: :running, data: persistent_data}) update_status(:running) + put_stat(:started_at, NaiveDateTime.utc_now()) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") max_processed_id = data_migration.data["max_processed_id"] || 0 - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: object.id > ^max_processed_id, - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) + query() + |> where([object], object.id > ^max_processed_id) |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) @@ -119,8 +111,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do _ = Repo.query( - "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", - [object_ids -- failed_ids] + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration.id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) @@ -129,6 +122,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) + put_stat( + :records_per_second, + state()[:processed_count] / + Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) + ) + persist_stats(data_migration) # A quick and dirty approach to controlling the load this background migration imposes @@ -137,12 +136,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with {:ok, %{rows: [[0]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration.id] - ) do - _ = DataMigration.update_state(data_migration, :complete) + with 0 <- failures_count(data_migration.id) do + {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) handle_success(data_migration) else @@ -155,9 +150,36 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do {:noreply, state} end + defp query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later + from( + object in Object, + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) + end + defp transfer_object_hashtags(object) do - hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] + hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) + + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) + else + {:ok, object.id} + end + end + defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do @@ -188,6 +210,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = state()[:count] + + if stored_count && !force do + stored_count + else + processed_count = state()[:processed_count] || 0 + max_processed_id = data_migration().data["max_processed_id"] || 0 + query = where(query(), [object], object.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + count + end + end + defp persist_stats(data_migration) do runner_state = Map.drop(state(), [:status]) _ = DataMigration.update(data_migration, %{data: runner_state}) @@ -196,11 +235,17 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defp handle_success(data_migration) do update_status(:complete) - unless data_migration.feature_lock || Config.improved_hashtag_timeline() do - Config.put(Config.improved_hashtag_timeline_path(), true) - end + cond do + data_migration.feature_lock -> + :noop - :ok + not is_nil(Config.improved_hashtag_timeline()) -> + :noop + + true -> + Config.put(Config.improved_hashtag_timeline_path(), true) + :ok + end end def failed_objects_query do @@ -212,6 +257,36 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> order_by([o], asc: o.id) end + def failures_count(data_migration_id \\ nil) do + data_migration_id = data_migration_id || data_migration().id + + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id] + ) do + count + end + end + + def retry_failed do + data_migration = data_migration() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {:ok, _} <- transfer_object_hashtags(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration.id, object.id] + ) + end + end) + |> Stream.run() + end + def force_continue do send(whereis(), :migrate_hashtags) end @@ -221,6 +296,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do force_continue() end + def force_complete do + {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) + + handle_success(data_migration) + end + defp update_status(status, message \\ nil) do put_stat(:status, status) put_stat(:message, message)