# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Migrators.HashtagsTableMigrator do
- use GenServer
+ defmodule State do
+ use Pleroma.Migrators.Support.BaseMigratorState
- require Logger
+ @impl Pleroma.Migrators.Support.BaseMigratorState
+ defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
+ end
- import Ecto.Query
+ use Pleroma.Migrators.Support.BaseMigrator
- alias __MODULE__.State
- alias Pleroma.Config
alias Pleroma.Hashtag
+ alias Pleroma.Migrators.Support.BaseMigrator
alias Pleroma.Object
- alias Pleroma.Repo
-
- defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
- defdelegate data_migration_id(), to: State
-
- defdelegate state(), to: State
- defdelegate persist_state(), to: State, as: :persist_to_db
- defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
- defdelegate put_stat(key, value), to: State, as: :put_data_key
- defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
-
- @feature_config_path [:database, :improved_hashtag_timeline]
- @reg_name {:global, __MODULE__}
-
- def whereis, do: GenServer.whereis(@reg_name)
-
- def feature_state, do: Config.get(@feature_config_path)
-
- def start_link(_) do
- case whereis() do
- nil ->
- GenServer.start_link(__MODULE__, nil, name: @reg_name)
-
- pid ->
- {:ok, pid}
- end
- end
-
- @impl true
- def init(_) do
- {:ok, nil, {:continue, :init_state}}
- end
- @impl true
- def handle_continue(:init_state, _state) do
- {:ok, _} = State.start_link(nil)
+ @impl BaseMigrator
+ def feature_config_path, do: [:features, :improved_hashtag_timeline]
- data_migration = data_migration()
- manual_migrations = Config.get([:instance, :manual_data_migrations], [])
-
- cond do
- Config.get(:env) == :test ->
- update_status(:noop)
-
- is_nil(data_migration) ->
- message = "Data migration does not exist."
- update_status(:failed, message)
- Logger.error("#{__MODULE__}: #{message}")
-
- data_migration.state == :manual or data_migration.name in manual_migrations ->
- message = "Data migration is in manual execution or manual fix mode."
- update_status(:manual, message)
- Logger.warn("#{__MODULE__}: #{message}")
-
- data_migration.state == :complete ->
- on_complete(data_migration)
-
- true ->
- send(self(), :migrate_hashtags)
- end
-
- {:noreply, nil}
- end
-
- @impl true
- def handle_info(:migrate_hashtags, state) do
- State.reinit()
-
- update_status(:running)
- put_stat(:iteration_processed_count, 0)
- put_stat(:started_at, NaiveDateTime.utc_now())
+ @impl BaseMigrator
+ def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
+ @impl BaseMigrator
+ def perform do
data_migration_id = data_migration_id()
max_processed_id = get_stat(:max_processed_id, 0)
|> Enum.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1))
- # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
+ # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags
chunk_affected_count =
results
|> Enum.filter(&(elem(&1, 0) == :ok))
Process.sleep(sleep_interval)
end)
|> Stream.run()
-
- fault_rate = fault_rate()
- put_stat(:fault_rate, fault_rate)
- fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
-
- cond do
- fault_rate == 0 ->
- set_complete()
-
- is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
- message = """
- Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
- Putting data migration to manual fix mode. Check `retry_failed/0`.
- """
-
- Logger.warn("#{__MODULE__}: #{message}")
- update_status(:manual, message)
- on_complete(data_migration())
-
- true ->
- message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
- Logger.error("#{__MODULE__}: #{message}")
- update_status(:failed, message)
- end
-
- persist_state()
- {:noreply, state}
- end
-
- def fault_rate do
- with failures_count when is_integer(failures_count) <- failures_count() do
- failures_count / Enum.max([get_stat(:affected_count, 0), 1])
- else
- _ -> :error
- end
- end
-
- defp records_per_second do
- get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
end
- defp running_time do
- NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
- end
-
- @hashtags_objects_cleanup_query """
- DELETE FROM hashtags_objects WHERE object_id IN
- (SELECT DISTINCT objects.id FROM objects
- JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
- ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
- (objects.data->>'id')
- AND activities.data->>'type' = 'Create'
- WHERE activities.id IS NULL);
- """
-
- @hashtags_cleanup_query """
- DELETE FROM hashtags WHERE id IN
- (SELECT hashtags.id FROM hashtags
- LEFT OUTER JOIN hashtags_objects
- ON hashtags_objects.hashtag_id = hashtags.id
- WHERE hashtags_objects.hashtag_id IS NULL);
- """
-
- @doc """
- Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
- Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
- """
- def delete_non_create_activities_hashtags do
- {:ok, %{num_rows: hashtags_objects_count}} =
- Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
-
- {:ok, %{num_rows: hashtags_count}} =
- Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
-
- {:ok, hashtags_objects_count, hashtags_count}
- end
-
- defp query do
+ @impl BaseMigrator
+ def query do
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
# Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
from(
end)
end
- @doc "Approximate count for current iteration (including processed records count)"
- def count(force \\ false, timeout \\ :infinity) do
- stored_count = get_stat(:count)
-
- if stored_count && !force do
- stored_count
- else
- processed_count = get_stat(:processed_count, 0)
- max_processed_id = get_stat(:max_processed_id, 0)
- query = where(query(), [object], object.id > ^max_processed_id)
-
- count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
- put_stat(:count, count)
- persist_state()
-
- count
- end
- end
-
- defp on_complete(data_migration) do
- cond do
- data_migration.feature_lock ->
- :noop
-
- not is_nil(feature_state()) ->
- :noop
-
- true ->
- Config.put(@feature_config_path, true)
- :ok
- end
- end
-
- def failed_objects_query do
- from(o in Object)
- |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
- on: dmf.record_id == o.id
- )
- |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
- |> order_by([o], asc: o.id)
- end
-
- def failures_count do
- with {:ok, %{rows: [[count]]}} <-
- Repo.query(
- "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
- [data_migration_id()]
- ) do
- count
- end
- end
-
+ @impl BaseMigrator
def retry_failed do
data_migration_id = data_migration_id()
force_continue()
end
- def force_continue do
- send(whereis(), :migrate_hashtags)
+ defp failed_objects_query do
+ from(o in Object)
+ |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+ on: dmf.record_id == o.id
+ )
+ |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
+ |> order_by([o], asc: o.id)
end
- def force_restart do
- :ok = State.reset()
- force_continue()
- end
+ @doc """
+ Service func to delete `hashtags_objects` for legacy objects not associated with Create activity.
+ Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
+ """
+ def delete_non_create_activities_hashtags do
+ hashtags_objects_cleanup_query = """
+ DELETE FROM hashtags_objects WHERE object_id IN
+ (SELECT DISTINCT objects.id FROM objects
+ JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+ ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+ (objects.data->>'id')
+ AND activities.data->>'type' = 'Create'
+ WHERE activities.id IS NULL);
+ """
+
+ hashtags_cleanup_query = """
+ DELETE FROM hashtags WHERE id IN
+ (SELECT hashtags.id FROM hashtags
+ LEFT OUTER JOIN hashtags_objects
+ ON hashtags_objects.hashtag_id = hashtags.id
+ WHERE hashtags_objects.hashtag_id IS NULL);
+ """
- def set_complete do
- update_status(:complete)
- persist_state()
- on_complete(data_migration())
- end
+ {:ok, %{num_rows: hashtags_objects_count}} =
+ Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity)
- defp update_status(status, message \\ nil) do
- put_stat(:state, status)
- put_stat(:message, message)
+ {:ok, %{num_rows: hashtags_count}} =
+ Repo.query(hashtags_cleanup_query, [], timeout: :infinity)
+
+ {:ok, hashtags_objects_count, hashtags_count}
end
end