diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 4a00e8bc..278b79f2 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -235,8 +235,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | processed_request=processed_request, hydrated_request=request, ) - except Exception as exc: - logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') + except Exception: + logger.exception(f'Error marking request {request.unique_key} as handled.') return None else: return processed_request diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 7cc202bb..e1a4fa41 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -201,27 +201,27 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # Set the handled_at timestamp if not already set request_id = unique_key_to_request_id(request.unique_key) + if cached_request := self._requests_cache.get(request_id): + cached_request.handled_at = request.handled_at + if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) self.metadata.handled_request_count += 1 self.metadata.pending_request_count -= 1 - if cached_request := self._requests_cache.get(request_id): - cached_request.handled_at = request.handled_at - try: + # Remember that we handled this request, to optimize local deduplication. + self._requests_already_handled.add(request_id) + self._requests_in_progress.discard(request_id) + # Remove request from cache, it will most likely not be needed. + self._requests_cache.pop(request_id, None) # Update the request in the API # Works as upsert - adds the request if it does not exist yet. (Local request that was handled before # adding to the queue.) processed_request = await self._update_request(request) - # Remember that we handled this request, to optimize local deduplication. - self._requests_already_handled.add(request_id) - # Remove request from cache. It will most likely not be needed. - self._requests_cache.pop(request_id) - self._requests_in_progress.discard(request_id) - except Exception as exc: - logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') + except Exception: + logger.exception(f'Error marking request {request.unique_key} as handled.') return None else: return processed_request @@ -303,6 +303,11 @@ async def _list_head(self) -> None: # Ignore requests that are already in progress, we will not process them again. continue + if request_id in self._requests_already_handled: + # Request is locally known to be handled, but platform is not aware of it. + # This can be either due to delay in API data propagation or failed API call to mark it as handled. + continue + if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their id. self._requests_already_handled.add(request_id) diff --git a/tests/integration/apify_api/test_request_queue.py b/tests/integration/apify_api/test_request_queue.py index e90c1600..9c8c7f96 100644 --- a/tests/integration/apify_api/test_request_queue.py +++ b/tests/integration/apify_api/test_request_queue.py @@ -1192,3 +1192,40 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic Actor.log.info(stats_after) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 + + +async def test_request_queue_api_fail_when_marking_as_handled( + apify_token: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """Test that single-access based Apify RQ can deal with API failures when marking requests as handled. + + Single-access based Apify RQ is aware that local information is reliable, so even if marking as handled fails + during API call, the RQ correctly tracks the handling information locally. It can even fix the missing handled + information on the platform later, when fetching next request later. + """ + + monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token) + async with Actor: + rq = await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access='single')) + + try: + request = Request.from_url('http://example.com') + # Fetch request + await rq.add_request(request) + assert request == await rq.fetch_next_request() + + # Mark as handled, but simulate API failure. + with mock.patch.object( + rq._client._api_client, 'update_request', side_effect=Exception('Simulated API failure') + ): + await rq.mark_request_as_handled(request) + assert not (await rq.get_request(request.unique_key)).was_already_handled + + # RQ with `request_queue_access="single"` knows, that the local information is reliable, so it knows it + # handled this request already despite the platform not being aware of it. + assert not await rq.fetch_next_request() + assert await rq.is_finished() + assert await rq.is_empty() + + finally: + await rq.drop()