-
Notifications
You must be signed in to change notification settings - Fork 15
Description
Summary
Once we drop support for Python 3.10, we should adopt asyncio.TaskGroup (available in Python 3.11+) to replace manual task management patterns (asyncio.create_task + manual cancellation + asyncio.gather). This improves error propagation, automatic cleanup, and overall code clarity.
Current Implementation
1. RequestQueueClientAsync.batch_add_requests — High priority
src/apify_client/clients/resource_clients/request_queue.py (lines 712–790)
The batch processing spawns multiple worker tasks to process batched API calls in parallel using manual task management:
tasks = set[asyncio.Task]()
for i in range(max_parallel):
task = asyncio.create_task(coro, name=f'batch_add_requests_worker_{i}')
tasks.add(task)
await queue.join()
for task in tasks:
task.cancel()
results: list[BatchAddRequestsResult] = await asyncio.gather(*tasks)2. StreamedLogAsync — Medium priority
src/apify_client/clients/resource_clients/log.py (lines 332–377)
Manages a single background streaming task with asyncio.create_task() and manual cancel/await in stop():
def start(self) -> Task:
self._streaming_task = asyncio.create_task(self._stream_log())
return self._streaming_task
async def stop(self) -> None:
self._streaming_task.cancel()
try:
await self._streaming_task
except asyncio.CancelledError:
self._streaming_task = None3. StatusMessageWatcherAsync — Medium priority
src/apify_client/clients/resource_clients/log.py (lines 428–480)
Nearly identical pattern to StreamedLogAsync — a single background polling task with manual start/cancel/await:
def start(self) -> Task:
self._logging_task = asyncio.create_task(self._log_changed_status_message())
return self._logging_task
async def stop(self) -> None:
self._logging_task.cancel()
try:
await self._logging_task
except asyncio.CancelledError:
self._logging_task = None4. Documentation example — Low priority
docs/03_examples/code/02_tasks_async.py (lines 44–45)
Uses asyncio.gather() to run multiple task clients concurrently:
run_apify_tasks = [run_apify_task(client) for client in apify_task_clients]
task_run_results = await asyncio.gather(*run_apify_tasks)Proposed Improvement
Batch processing (request_queue.py)
Use asyncio.TaskGroup for cleaner code and automatic exception handling:
async with asyncio.TaskGroup() as tg:
workers = [
tg.create_task(self._batch_add_requests_worker(queue, params), name=f'batch_add_requests_worker_{i}')
for i in range(max_parallel)
]
await queue.join()
for worker in workers:
worker.cancel()
# TaskGroup automatically waits and handles exceptions properlyLog streaming and status watcher (log.py)
TaskGroup could simplify the __aenter__/__aexit__ lifecycle management by tying task lifetime to the group's scope. However, start() currently returns the raw Task object — we need to check if any callers depend on this before refactoring.
Documentation example
Update to showcase the modern TaskGroup pattern:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(run_apify_task(client)) for client in apify_task_clients]
task_run_results = [t.result() for t in tasks]Benefits
- Better error handling:
TaskGroupautomatically propagates exceptions from any task. If a worker fails unexpectedly beforequeue.join()completes, all remaining workers are cancelled and the exception propagates cleanly viaExceptionGroup. - Cleaner code: Context manager pattern ensures proper cleanup — no risk of fire-and-forget task leaks.
- More Pythonic: Uses modern Python structured concurrency patterns.
- Automatic waiting: No need for explicit
asyncio.gather()calls or manual cancellation loops.
Notes
- This is blocked until Python 3.10 support is dropped (see
target_python_versioninpyproject.toml). - For the single-task cases (Python Client v1 #2, Create project structure #3), evaluate whether the simplification is worth the refactor —
TaskGroupshines most with fan-out concurrency (Write design document #1). - The
ExceptionGroupraised byTaskGroupon failure differs fromgather(return_exceptions=True)behavior — tests should be reviewed accordingly.
Prerequisites
- Drop support for Python 3.10 (
TaskGrouprequires Python 3.11+).