X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fmigrators%2Fhashtags_table_migrator.ex;h=b84058e11d189e66b4b8855a6a032a7a34f0f47a;hb=a079ec3a3cdfd42d2cbd51c7698c2c87828e5778;hp=a7e3de542c78744191437807bc045c3854b1e5cf;hpb=3e4d84729a4ca8d9779d439a9aa2c8c23b3acd1d;p=akkoma diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index a7e3de542..b84058e11 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -4,192 +4,134 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defmodule State do - use Agent + use Pleroma.Migrators.Support.BaseMigratorState - @init_state %{} - - def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) - end - - def get do - Agent.get(__MODULE__, & &1) - end - - def put(key, value) do - Agent.update(__MODULE__, fn state -> - Map.put(state, key, value) - end) - end - - def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) - end) - end + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table end - use GenServer - - require Logger - - import Ecto.Query + use Pleroma.Migrators.Support.BaseMigrator - alias Pleroma.Config - alias Pleroma.DataMigration alias Pleroma.Hashtag + alias Pleroma.Migrators.Support.BaseMigrator alias Pleroma.Object - alias Pleroma.Repo - - defdelegate state(), to: State, as: :get - defdelegate put_state(key, value), to: State, as: :put - defdelegate increment_state(key, increment), to: State, as: :increment - - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table - - def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) - end - - @impl true - def init(_) do - {:ok, nil, {:continue, :init_state}} - end - - @impl true - def handle_continue(:init_state, _state) do - {:ok, _} = State.start_link(nil) - - put_state(:status, :init) - - dm = data_migration() - cond do - Config.get(:env) == :test -> - put_state(:status, :noop) + @impl BaseMigrator + def feature_config_path, do: [:features, :improved_hashtag_timeline] - is_nil(dm) -> - put_state(:status, :halt) - put_state(:message, "Data migration does not exist.") + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) - dm.state == :manual -> - put_state(:status, :noop) - put_state(:message, "Data migration is in manual execution state.") + @impl BaseMigrator + def perform do + data_migration_id = data_migration_id() + max_processed_id = get_stat(:max_processed_id, 0) - dm.state == :complete -> - handle_success() + Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") - true -> - send(self(), :migrate_hashtags) - end - - {:noreply, nil} - end - - @impl true - def handle_info(:migrate_hashtags, state) do - data_migration = data_migration() - - {:ok, data_migration} = DataMigration.update_state(data_migration, :running) - put_state(:status, :running) - - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") - - max_processed_id = data_migration.data["max_processed_id"] || 0 - - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: object.id > ^max_processed_id, - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) + query() + |> where([object], object.id > ^max_processed_id) |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) + results = Enum.map(objects, &transfer_object_hashtags(&1)) + failed_ids = - objects - |> Enum.map(&transfer_object_hashtags(&1)) + results |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) + # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + for failed_id <- failed_ids do _ = Repo.query( "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> "VALUES ($1, $2) ON CONFLICT DO NOTHING;", - [data_migration.id, failed_id] + [data_migration_id, failed_id] ) end _ = Repo.query( - "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", - [object_ids -- failed_ids] + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration_id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) - _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}}) - increment_state(:processed_count, length(object_ids)) - increment_state(:failed_count, length(failed_ids)) + put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) + put_stat(:records_per_second, records_per_second()) + persist_state() # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) Process.sleep(sleep_interval) end) |> Stream.run() + end + + @impl BaseMigrator + def query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up + from( + object in Object, + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) + end - with {:ok, %{rows: [[0]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration.id] - ) do - put_state(:status, :complete) - _ = DataMigration.update_state(data_migration, :complete) + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} + defp transfer_object_hashtags(object) do + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] + hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) - handle_success() + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) else - _ -> - put_state(:status, :failed) - put_state(:message, "Please check data_migration_failed_ids records.") + {:noop, object.id} end - - {:noreply, state} end - defp transfer_object_hashtags(object) do - hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) - + defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - for hashtag_record <- hashtag_records do - with {:ok, _} <- - Repo.query( - "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", - [hashtag_record.id, object.id] - ) do - nil - else - {:error, e} -> - error = - "ERROR: could not link object #{object.id} and hashtag " <> - "#{hashtag_record.id}: #{inspect(e)}" + maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" - Logger.error(error) + try do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do + object.id + else + e -> + Logger.error("#{base_error}: #{inspect(e)}") Repo.rollback(object.id) end + rescue + e -> + Logger.error("#{base_error}: #{inspect(e)}") + Repo.rollback(object.id) end - - object.id else e -> error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" @@ -199,13 +141,68 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end - defp handle_success do - put_state(:status, :complete) + @impl BaseMigrator + def retry_failed do + data_migration_id = data_migration_id() - unless Config.improved_hashtag_timeline() do - Config.put(Config.improved_hashtag_timeline_path(), true) - end + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {res, _} when res != :error <- transfer_object_hashtags(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration_id, object.id] + ) + end + end) + |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() + end + + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) + end - :ok + @doc """ + Service func to delete `hashtags_objects` for legacy objects not associated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + hashtags_objects_cleanup_query = """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + hashtags_cleanup_query = """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ + + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} end end