- @doc "Approximate count for current iteration (including processed records count)"
- def count(force \\ false, timeout \\ :infinity) do
- stored_count = state()[:count]
-
- if stored_count && !force do
- stored_count
- else
- processed_count = state()[:processed_count] || 0
- max_processed_id = data_migration().data["max_processed_id"] || 0
- query = where(query(), [object], object.id > ^max_processed_id)
-
- count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
- put_stat(:count, count)
- count
- end
- end
-
- defp persist_stats(data_migration) do
- runner_state = Map.drop(state(), [:status])
- _ = DataMigration.update(data_migration, %{data: runner_state})
- end
-
- defp handle_success(data_migration) do
- update_status(:complete)
-
- cond do
- data_migration.feature_lock ->
- :noop
-
- not is_nil(Config.improved_hashtag_timeline()) ->
- :noop
-
- true ->
- Config.put(Config.improved_hashtag_timeline_path(), true)
- :ok
- end
- end
-
- def failed_objects_query do
- from(o in Object)
- |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
- on: dmf.record_id == o.id
- )
- |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
- |> order_by([o], asc: o.id)
- end
-
- def failures_count(data_migration_id \\ nil) do
- data_migration_id = data_migration_id || data_migration().id
-
- with {:ok, %{rows: [[count]]}} <-
- Repo.query(
- "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
- [data_migration_id]
- ) do
- count
- end
- end
-