Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ async def reclaim_request(
processed_request = await self._update_request(request, forefront=forefront)
processed_request.unique_key = request.unique_key

# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
# The platform reports the request's state before this update via `was_already_handled`. If it was
# handled, this update moved it from handled back to pending, so mirror that in the local metadata.
if processed_request.was_already_handled:
self.metadata.handled_request_count -= 1
self.metadata.pending_request_count += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ async def reclaim_request(
processed_request = await self._update_request(request, forefront=forefront)
processed_request.id = request_id
processed_request.unique_key = request.unique_key
# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
# The platform reports the request's state before this update via `was_already_handled`. If it was
# handled, this update moved it from handled back to pending, so mirror that in the local metadata.
if processed_request.was_already_handled:
self.metadata.handled_request_count -= 1
self.metadata.pending_request_count += 1

Expand Down
40 changes: 40 additions & 0 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,46 @@ async def test_request_reclaim_with_forefront(
Actor.log.info(f'Test completed - processed {remaining_count} additional requests')


async def test_reclaim_handled_request_moves_back_to_pending(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
) -> None:
"""Reclaiming an already-handled request must move it from handled back to pending in the queue metadata."""
rq = request_queue_apify

# Add a request and mark it as handled.
await rq.add_request('https://example.com/reclaim-handled')
request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2)
assert request is not None
await rq.mark_request_as_handled(request)

# The local estimate reflects the handled count immediately, even before the API metadata propagates.
metadata = await poll_until_condition(
rq.get_metadata,
lambda m: m.handled_request_count == 1,
timeout=rq_poll_timeout,
backoff_factor=2,
)
assert metadata.handled_request_count == 1, f'handled={metadata.handled_request_count}'

# Reclaim the already-handled request. The platform reports the prior (handled) state on the update.
reclaim_result = await rq.reclaim_request(request)
assert reclaim_result is not None
assert reclaim_result.was_already_handled is True

# The counts must move handled -> pending. The decrement only becomes visible once the API-side metadata
# catches up, since the merged metadata takes max(api, local) for the handled count, so poll until it settles.
metadata = await poll_until_condition(
rq.get_metadata,
lambda m: m.handled_request_count == 0 and m.pending_request_count == 1,
timeout=60,
poll_interval=5,
)
assert metadata.handled_request_count == 0, f'handled={metadata.handled_request_count}'
assert metadata.pending_request_count == 1, f'pending={metadata.pending_request_count}'
assert metadata.total_request_count == 1, f'total={metadata.total_request_count}'


async def test_complex_request_objects(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
Expand Down
Loading