"""Async batch runner with semaphore-based concurrency control."""
from __future__ import annotations

import asyncio
from typing import Any, Callable, Coroutine, Optional


class AsyncBatchRunner:
    """
    Run async tasks with semaphore-based concurrency control.

    Args:
        max_concurrent: maximum number of concurrent tasks
    """

    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent

    async def run(
        self,
        tasks: list[Coroutine[Any, Any, Any]],
        on_progress: Callable[[int, int], None] | None = None,
        cancel_event: Optional[Any] = None,
    ) -> list[Any]:
        """
        Run all tasks with concurrency limiting.

        Args:
            tasks: list of coroutines to execute
            on_progress: callback(completed, total)
            cancel_event: threading.Event — if set, pending tasks return None
        """
        semaphore = asyncio.Semaphore(self.max_concurrent)
        completed_count = 0
        total = len(tasks)

        async def limited(task: Coroutine) -> Any:
            nonlocal completed_count
            if cancel_event is not None and cancel_event.is_set():
                return None
            async with semaphore:
                if cancel_event is not None and cancel_event.is_set():
                    return None
                result = await task
                completed_count += 1
                if on_progress is not None:
                    on_progress(completed_count, total)
                return result

        return list(await asyncio.gather(*[limited(t) for t in tasks]))
