DataMigration.update(data_migration, %{state: :running, data: persistent_data})
update_status(:running)
+ put_stat(:started_at, NaiveDateTime.utc_now())
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids))
+ put_stat(
+ :records_per_second,
+ state()[:processed_count] /
+ Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
+ )
+
persist_stats(data_migration)
# A quick and dirty approach to controlling the load this background migration imposes
end)
end
+ @doc "Approximate count for current iteration (including processed records count)"
def count(force \\ false, timeout \\ :infinity) do
stored_count = state()[:count]
if stored_count && !force do
stored_count
else
- count = Repo.aggregate(query(), :count, :id, timeout: timeout)
+ processed_count = state()[:processed_count] || 0
+ max_processed_id = data_migration().data["max_processed_id"] || 0
+ query = where(query(), [object], object.id > ^max_processed_id)
+
+ count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
put_stat(:count, count)
count
end