Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
processed_request=processed_request,
hydrated_request=request,
)
except Exception as exc:
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
except Exception:
logger.exception(f'Error marking request {request.unique_key} as handled.')
Comment on lines -238 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will logger.exception still include the traceback if we're not using {exc!s}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it comes with default exc_info=True

return None
else:
return processed_request
Expand Down
25 changes: 15 additions & 10 deletions src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,27 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
# Set the handled_at timestamp if not already set
request_id = unique_key_to_request_id(request.unique_key)

if cached_request := self._requests_cache.get(request_id):
cached_request.handled_at = request.handled_at

if request.handled_at is None:
request.handled_at = datetime.now(tz=timezone.utc)
self.metadata.handled_request_count += 1
self.metadata.pending_request_count -= 1

if cached_request := self._requests_cache.get(request_id):
cached_request.handled_at = request.handled_at

try:
# Remember that we handled this request, to optimize local deduplication.
self._requests_already_handled.add(request_id)
self._requests_in_progress.discard(request_id)
# Remove request from cache, it will most likely not be needed.
self._requests_cache.pop(request_id, None)
# Update the request in the API
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
# adding to the queue.)
processed_request = await self._update_request(request)
# Remember that we handled this request, to optimize local deduplication.
self._requests_already_handled.add(request_id)
# Remove request from cache. It will most likely not be needed.
self._requests_cache.pop(request_id)
self._requests_in_progress.discard(request_id)

except Exception as exc:
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
except Exception:
logger.exception(f'Error marking request {request.unique_key} as handled.')
return None
else:
return processed_request
Expand Down Expand Up @@ -303,6 +303,11 @@ async def _list_head(self) -> None:
# Ignore requests that are already in progress, we will not process them again.
continue

if request_id in self._requests_already_handled:
# Request is locally known to be handled, but platform is not aware of it.
# This can be either due to delay in API data propagation or failed API call to mark it as handled.
continue

if request.was_already_handled:
# Do not cache fully handled requests, we do not need them. Just cache their id.
self._requests_already_handled.add(request_id)
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/apify_api/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,3 +1192,40 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic
Actor.log.info(stats_after)

assert (stats_after['writeCount'] - stats_before['writeCount']) == 1


async def test_request_queue_api_fail_when_marking_as_handled(
apify_token: str, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test that single-access based Apify RQ can deal with API failures when marking requests as handled.

Single-access based Apify RQ is aware that local information is reliable, so even if marking as handled fails
during API call, the RQ correctly tracks the handling information locally. It can even fix the missing handled
information on the platform later, when fetching next request later.
"""

monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
async with Actor:
rq = await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access='single'))

try:
request = Request.from_url('http://example.com')
# Fetch request
await rq.add_request(request)
assert request == await rq.fetch_next_request()

# Mark as handled, but simulate API failure.
with mock.patch.object(
rq._client._api_client, 'update_request', side_effect=Exception('Simulated API failure')
):
await rq.mark_request_as_handled(request)
assert not (await rq.get_request(request.unique_key)).was_already_handled

# RQ with `request_queue_access="single"` knows, that the local information is reliable, so it knows it
# handled this request already despite the platform not being aware of it.
assert not await rq.fetch_next_request()
assert await rq.is_finished()
assert await rq.is_empty()

finally:
await rq.drop()