max_processed_id = data_migration.data["max_processed_id"] || 0
- # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
- from(
- object in Object,
- left_join: hashtag in assoc(object, :hashtags),
- where: object.id > ^max_processed_id,
- where: is_nil(hashtag.id),
- where:
- fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
- select: %{
- id: object.id,
- tag: fragment("(?)->'tag'", object.data)
- }
- )
+ query()
+ |> where([object], object.id > ^max_processed_id)
|> Repo.chunk_stream(100, :batches, timeout: :infinity)
|> Stream.each(fn objects ->
object_ids = Enum.map(objects, & &1.id)
{:noreply, state}
end
+ defp query do
+ # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
+ from(
+ object in Object,
+ left_join: hashtag in assoc(object, :hashtags),
+ where: is_nil(hashtag.id),
+ where:
+ fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
+ select: %{
+ id: object.id,
+ tag: fragment("(?)->'tag'", object.data)
+ }
+ )
+ end
+
defp transfer_object_hashtags(object) do
hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
end)
end
+ def count(force \\ false) do
+ stored_count = state()[:count]
+
+ if stored_count && !force do
+ stored_count
+ else
+ count = Repo.aggregate(query(), :count, :id)
+ put_stat(:count, count)
+ count
+ end
+ end
+
defp persist_stats(data_migration) do
runner_state = Map.drop(state(), [:status])
_ = DataMigration.update(data_migration, %{data: runner_state})