+ @doc "Approximate count for current iteration (including processed records count)"
+ def count(force \\ false, timeout \\ :infinity) do
+ stored_count = state()[:count]
+
+ if stored_count && !force do
+ stored_count
+ else
+ processed_count = state()[:processed_count] || 0
+ max_processed_id = data_migration().data["max_processed_id"] || 0
+ query = where(query(), [object], object.id > ^max_processed_id)
+
+ count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
+ put_stat(:count, count)
+ count
+ end
+ end
+
+ defp persist_stats(data_migration) do
+ runner_state = Map.drop(state(), [:status])
+ _ = DataMigration.update(data_migration, %{data: runner_state})
+ end
+
+ defp handle_success(data_migration) do
+ update_status(:complete)
+
+ cond do
+ data_migration.feature_lock ->
+ :noop
+
+ not is_nil(Config.improved_hashtag_timeline()) ->
+ :noop
+
+ true ->
+ Config.put(Config.improved_hashtag_timeline_path(), true)
+ :ok
+ end
+ end
+
+ def failed_objects_query do
+ from(o in Object)
+ |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+ on: dmf.record_id == o.id
+ )
+ |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
+ |> order_by([o], asc: o.id)
+ end
+
+ def failures_count(data_migration_id \\ nil) do
+ data_migration_id = data_migration_id || data_migration().id