Skip to content

Adopt asyncio.TaskGroup for structured concurrency #598

@vdusek

Description

@vdusek

Summary

Once we drop support for Python 3.10, we should adopt asyncio.TaskGroup (available in Python 3.11+) to replace manual task management patterns (asyncio.create_task + manual cancellation + asyncio.gather). This improves error propagation, automatic cleanup, and overall code clarity.

Current Implementation

1. RequestQueueClientAsync.batch_add_requests — High priority

src/apify_client/clients/resource_clients/request_queue.py (lines 712–790)

The batch processing spawns multiple worker tasks to process batched API calls in parallel using manual task management:

tasks = set[asyncio.Task]()
for i in range(max_parallel):
    task = asyncio.create_task(coro, name=f'batch_add_requests_worker_{i}')
    tasks.add(task)

await queue.join()

for task in tasks:
    task.cancel()

results: list[BatchAddRequestsResult] = await asyncio.gather(*tasks)

2. StreamedLogAsync — Medium priority

src/apify_client/clients/resource_clients/log.py (lines 332–377)

Manages a single background streaming task with asyncio.create_task() and manual cancel/await in stop():

def start(self) -> Task:
    self._streaming_task = asyncio.create_task(self._stream_log())
    return self._streaming_task

async def stop(self) -> None:
    self._streaming_task.cancel()
    try:
        await self._streaming_task
    except asyncio.CancelledError:
        self._streaming_task = None

3. StatusMessageWatcherAsync — Medium priority

src/apify_client/clients/resource_clients/log.py (lines 428–480)

Nearly identical pattern to StreamedLogAsync — a single background polling task with manual start/cancel/await:

def start(self) -> Task:
    self._logging_task = asyncio.create_task(self._log_changed_status_message())
    return self._logging_task

async def stop(self) -> None:
    self._logging_task.cancel()
    try:
        await self._logging_task
    except asyncio.CancelledError:
        self._logging_task = None

4. Documentation example — Low priority

docs/03_examples/code/02_tasks_async.py (lines 44–45)

Uses asyncio.gather() to run multiple task clients concurrently:

run_apify_tasks = [run_apify_task(client) for client in apify_task_clients]
task_run_results = await asyncio.gather(*run_apify_tasks)

Proposed Improvement

Batch processing (request_queue.py)

Use asyncio.TaskGroup for cleaner code and automatic exception handling:

async with asyncio.TaskGroup() as tg:
    workers = [
        tg.create_task(self._batch_add_requests_worker(queue, params), name=f'batch_add_requests_worker_{i}')
        for i in range(max_parallel)
    ]
    await queue.join()
    for worker in workers:
        worker.cancel()
# TaskGroup automatically waits and handles exceptions properly

Log streaming and status watcher (log.py)

TaskGroup could simplify the __aenter__/__aexit__ lifecycle management by tying task lifetime to the group's scope. However, start() currently returns the raw Task object — we need to check if any callers depend on this before refactoring.

Documentation example

Update to showcase the modern TaskGroup pattern:

async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(run_apify_task(client)) for client in apify_task_clients]
task_run_results = [t.result() for t in tasks]

Benefits

  • Better error handling: TaskGroup automatically propagates exceptions from any task. If a worker fails unexpectedly before queue.join() completes, all remaining workers are cancelled and the exception propagates cleanly via ExceptionGroup.
  • Cleaner code: Context manager pattern ensures proper cleanup — no risk of fire-and-forget task leaks.
  • More Pythonic: Uses modern Python structured concurrency patterns.
  • Automatic waiting: No need for explicit asyncio.gather() calls or manual cancellation loops.

Notes

  • This is blocked until Python 3.10 support is dropped (see target_python_version in pyproject.toml).
  • For the single-task cases (Python Client v1 #2, Create project structure #3), evaluate whether the simplification is worth the refactor — TaskGroup shines most with fan-out concurrency (Write design document #1).
  • The ExceptionGroup raised by TaskGroup on failure differs from gather(return_exceptions=True) behavior — tests should be reviewed accordingly.

Prerequisites

  • Drop support for Python 3.10 (TaskGroup requires Python 3.11+).

Metadata

Metadata

Assignees

Labels

t-toolingIssues with this label are in the ownership of the tooling team.

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions