From 4bb9e6e375c60ef305a860646df45de9155b3c7b Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Thu, 5 Feb 2026 16:29:03 +0000 Subject: [PATCH] Change .decode_async() to .decode_iter() (#12028) --- CHANGES/11898.bugfix.rst | 10 ++++++--- aiohttp/multipart.py | 41 ++++++++++++++++++++---------------- aiohttp/web_request.py | 20 +++++++++--------- docs/multipart_reference.rst | 7 ++++-- tests/test_multipart.py | 5 +++-- 5 files changed, 48 insertions(+), 35 deletions(-) diff --git a/CHANGES/11898.bugfix.rst b/CHANGES/11898.bugfix.rst index f430bcce997..2a2e41d037b 100644 --- a/CHANGES/11898.bugfix.rst +++ b/CHANGES/11898.bugfix.rst @@ -1,6 +1,10 @@ Restored :py:meth:`~aiohttp.BodyPartReader.decode` as a synchronous method for backward compatibility. The method was inadvertently changed to async in 3.13.3 as part of the decompression bomb security fix. A new -:py:meth:`~aiohttp.BodyPartReader.decode_async` method is now available -for non-blocking decompression of large payloads. Internal aiohttp code -uses the async variant to maintain security protections -- by :user:`bdraco`. +:py:meth:`~aiohttp.BodyPartReader.decode_iter` method is now available +for non-blocking decompression of large payloads using an async generator. +Internal aiohttp code uses the async variant to maintain security protections. + +Changed multipart processing chunk sizes from 64 KiB to 256KiB, to better +match aiohttp internals +-- by :user:`bdraco` and :user:`Dreamsorcerer`. diff --git a/aiohttp/multipart.py b/aiohttp/multipart.py index 21697b8c175..97fdae77d87 100644 --- a/aiohttp/multipart.py +++ b/aiohttp/multipart.py @@ -6,7 +6,7 @@ import uuid import warnings from collections import deque -from collections.abc import Iterator, Mapping, Sequence +from collections.abc import AsyncIterator, Iterator, Mapping, Sequence from types import TracebackType from typing import TYPE_CHECKING, Any, Union, cast from urllib.parse import parse_qsl, unquote, urlencode @@ -314,7 +314,10 @@ async def read(self, *, decode: bool = False) -> bytes: data.extend(await self.read_chunk(self.chunk_size)) # https://github.com/python/mypy/issues/17537 if decode: # type: ignore[unreachable] - return await self.decode_async(data) + decoded_data = bytearray() + async for d in self.decode_iter(data): + decoded_data.extend(d) + return decoded_data return data async def read_chunk(self, size: int = chunk_size) -> bytes: @@ -509,7 +512,7 @@ def decode(self, data: bytes) -> bytes: Decodes data according the specified Content-Encoding or Content-Transfer-Encoding headers value. - Note: For large payloads, consider using decode_async() instead + Note: For large payloads, consider using decode_iter() instead to avoid blocking the event loop during decompression. """ data = self._apply_content_transfer_decoding(data) @@ -517,8 +520,8 @@ def decode(self, data: bytes) -> bytes: return self._decode_content(data) return data - async def decode_async(self, data: bytes) -> bytes: - """Decodes data asynchronously. + async def decode_iter(self, data: bytes) -> AsyncIterator[bytes]: + """Async generator that yields decoded data chunks. Decodes data according the specified Content-Encoding or Content-Transfer-Encoding headers value. @@ -528,8 +531,10 @@ async def decode_async(self, data: bytes) -> bytes: """ data = self._apply_content_transfer_decoding(data) if self._needs_content_decoding(): - return await self._decode_content_async(data) - return data + async for d in self._decode_content_async(data): + yield d + else: + yield data def _decode_content(self, data: bytes) -> bytes: encoding = self.headers.get(CONTENT_ENCODING, "").lower() @@ -543,17 +548,18 @@ def _decode_content(self, data: bytes) -> bytes: raise RuntimeError(f"unknown content encoding: {encoding}") - async def _decode_content_async(self, data: bytes) -> bytes: + async def _decode_content_async(self, data: bytes) -> AsyncIterator[bytes]: encoding = self.headers.get(CONTENT_ENCODING, "").lower() if encoding == "identity": - return data - if encoding in {"deflate", "gzip"}: - return await ZLibDecompressor( + yield data + elif encoding in {"deflate", "gzip"}: + d = ZLibDecompressor( encoding=encoding, suppress_deflate_header=True, - ).decompress(data, max_length=self._max_decompress_size) - - raise RuntimeError(f"unknown content encoding: {encoding}") + ) + yield await d.decompress(data, max_length=self._max_decompress_size) + else: + raise RuntimeError(f"unknown content encoding: {encoding}") def _decode_content_transfer(self, data: bytes) -> bytes: encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() @@ -624,10 +630,9 @@ async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> byt async def write(self, writer: AbstractStreamWriter) -> None: field = self._value - chunk = await field.read_chunk(size=2**16) - while chunk: - await writer.write(await field.decode_async(chunk)) - chunk = await field.read_chunk(size=2**16) + while chunk := await field.read_chunk(size=2**18): + async for d in field.decode_iter(chunk): + await writer.write(d) class MultipartReader: diff --git a/aiohttp/web_request.py b/aiohttp/web_request.py index 09126b944cf..260b822482d 100644 --- a/aiohttp/web_request.py +++ b/aiohttp/web_request.py @@ -714,17 +714,17 @@ async def post(self) -> "MultiDictProxy[str | bytes | FileField]": tmp = await self._loop.run_in_executor( None, tempfile.TemporaryFile ) - chunk = await field.read_chunk(size=2**16) - while chunk: - chunk = await field.decode_async(chunk) - await self._loop.run_in_executor(None, tmp.write, chunk) - size += len(chunk) - if 0 < max_size < size: - await self._loop.run_in_executor(None, tmp.close) - raise HTTPRequestEntityTooLarge( - max_size=max_size, actual_size=size + while chunk := await field.read_chunk(size=2**18): + async for decoded_chunk in field.decode_iter(chunk): + await self._loop.run_in_executor( + None, tmp.write, decoded_chunk ) - chunk = await field.read_chunk(size=2**16) + size += len(decoded_chunk) + if 0 < max_size < size: + await self._loop.run_in_executor(None, tmp.close) + raise HTTPRequestEntityTooLarge( + max_size=max_size, actual_size=size + ) await self._loop.run_in_executor(None, tmp.seek, 0) if field_ct is None: diff --git a/docs/multipart_reference.rst b/docs/multipart_reference.rst index 2c13c8cfec9..e116ee879ed 100644 --- a/docs/multipart_reference.rst +++ b/docs/multipart_reference.rst @@ -119,15 +119,18 @@ Multipart reference .. note:: - For large payloads, consider using :meth:`decode_async` instead + For large payloads, consider using :meth:`decode_iter` instead to avoid blocking the event loop during decompression. - .. method:: decode_async(data) + .. method:: decode_iter(data) :async: Decodes data asynchronously according the specified ``Content-Encoding`` or ``Content-Transfer-Encoding`` headers value. + This is an async iterator and will return decoded data in chunks. This + can be used to avoid loading large payloads into memory. + This method offloads decompression to an executor for large payloads to avoid blocking the event loop. diff --git a/tests/test_multipart.py b/tests/test_multipart.py index 25672c9005a..5444817d5a4 100644 --- a/tests/test_multipart.py +++ b/tests/test_multipart.py @@ -397,14 +397,15 @@ async def test_decode_with_content_transfer_encoding_base64(self) -> None: result += obj.decode(chunk) assert b"Time to Relax!" == result - async def test_decode_async_with_content_transfer_encoding_base64(self) -> None: + async def test_decode_iter_with_content_transfer_encoding_base64(self) -> None: h = CIMultiDictProxy(CIMultiDict({CONTENT_TRANSFER_ENCODING: "base64"})) with Stream(b"VG\r\r\nltZSB0byBSZ\r\nWxheCE=\r\n--:--") as stream: obj = aiohttp.BodyPartReader(BOUNDARY, h, stream) result = b"" while not obj.at_eof(): chunk = await obj.read_chunk(size=6) - result += await obj.decode_async(chunk) + async for decoded_chunk in obj.decode_iter(chunk): + result += decoded_chunk assert b"Time to Relax!" == result async def test_decode_with_content_encoding_deflate(self) -> None: