Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 102 additions & 27 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from cachetools import LRUCache

from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
from crawlee.storage_clients.models import (
AddRequestsResponse,
ProcessedRequest,
RequestQueueMetadata,
UnprocessedRequest,
)

from ._models import ApifyRequestQueueMetadata, CachedRequest, RequestQueueHead
from ._utils import to_crawlee_request, unique_key_to_request_id
Expand Down Expand Up @@ -71,6 +76,14 @@ def __init__(
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size)
"""LRU cache storing request objects, keyed by request ID."""

self._requests_being_added: dict[str, asyncio.Future[bool]] = {}
"""In-flight `add_batch_of_requests` markers, keyed by request ID.

Each future resolves once the platform call that is adding the request settles: `True` if the request was
committed, `False` otherwise. Concurrent producers of the same request await it instead of re-sending,
which preserves deduplication while still avoiding false success when the original add fails.
"""

self._queue_has_locked_requests: bool | None = None
"""Whether the queue contains requests currently locked by other clients."""

Expand All @@ -87,9 +100,13 @@ async def add_batch_of_requests(
forefront: bool = False,
) -> AddRequestsResponse:
"""Specific implementation of this method for the RQ shared access mode."""
loop = asyncio.get_running_loop()
# Do not try to add previously added requests to avoid pointless expensive calls to API
new_requests: list[Request] = []
already_present_requests: list[ProcessedRequest] = []
# Requests a concurrent `add_batch_of_requests` call is already sending. We await its outcome instead of
# re-sending them, as (request, that call's in-flight future) pairs.
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]] = []

for request in requests:
request_id = unique_key_to_request_id(request.unique_key)
Expand All @@ -106,46 +123,68 @@ async def add_batch_of_requests(
)
)

elif request_id in self._requests_being_added:
# A concurrent call is already adding this request; await its outcome rather than re-sending it.
awaited_in_flight.append((request, self._requests_being_added[request_id]))

else:
# Add new request to the cache.
processed_request = ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
self._cache_request(
request_id,
processed_request,
)
# Register an in-flight marker so concurrent producers dedupe against it; caching is deferred
# until the platform confirms the request was accepted (see below).
new_requests.append(request)
self._requests_being_added[request_id] = loop.create_future()

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [request.model_dump(by_alias=True) for request in new_requests]

# Send requests to API.
batch_response = await self._api_client.batch_add_requests(
requests=requests_dict,
forefront=forefront,
)

batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
committed_request_ids: set[str] = set()
try:
# Send requests to API.
batch_response = await self._api_client.batch_add_requests(
requests=requests_dict,
forefront=forefront,
)

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(unprocessed_request_id, None)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Commit only the requests the platform actually accepted to the local dedup cache. Caching after
# the call succeeds (not before) keeps a failed call from poisoning the cache and silently
# deduplicating a later retry of the same request.
unprocessed_unique_keys = {request.unique_key for request in api_response.unprocessed_requests}
for request in new_requests:
if request.unique_key in unprocessed_unique_keys:
continue
request_id = unique_key_to_request_id(request.unique_key)
self._cache_request(
request_id,
ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
),
)
committed_request_ids.add(request_id)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
finally:
# Release the in-flight markers we registered. Committed requests tell concurrent producers the
# request reached the platform; everything else (unprocessed, API error, cancellation) tells them
# it did not, so they retry instead of reporting false success.
for request in new_requests:
request_id = unique_key_to_request_id(request.unique_key)
self._settle_pending_addition(request_id, committed=request_id in committed_request_ids)

else:
api_response = AddRequestsResponse.model_validate(
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
)

# Fold in requests a concurrent call was already adding.
await self._resolve_awaited_in_flight(awaited_in_flight, api_response)

logger.debug(
f'Tried to add new requests: {len(new_requests)}, '
f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, '
Expand All @@ -163,6 +202,42 @@ async def add_batch_of_requests(

return api_response

def _settle_pending_addition(self, request_id: str, *, committed: bool) -> None:
"""Resolve the in-flight add marker for a request, unblocking any concurrent producers awaiting it.

Args:
request_id: ID of the request whose in-flight add has settled.
committed: Whether the request was committed to the platform.
"""
future = self._requests_being_added.pop(request_id, None)
if future is not None and not future.done():
future.set_result(committed)

@staticmethod
async def _resolve_awaited_in_flight(
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]],
api_response: AddRequestsResponse,
) -> None:
"""Await concurrent in-flight adds of these requests and fold the outcome into `api_response`.

Requests the concurrent add committed are reported as already present; the rest are reported unprocessed
so the caller retries them rather than receiving false success.
"""
for request, future in awaited_in_flight:
if await future:
api_response.processed_requests.append(
ProcessedRequest(
id=unique_key_to_request_id(request.unique_key),
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
)
else:
api_response.unprocessed_requests.append(
UnprocessedRequest(unique_key=request.unique_key, url=request.url, method=request.method)
)

async def get_request(self, unique_key: str) -> Request | None:
"""Specific implementation of this method for the RQ shared access mode."""
return await self._get_request_by_id(unique_key_to_request_id(unique_key))
Expand Down
118 changes: 97 additions & 21 deletions src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from __future__ import annotations

import asyncio
from collections import deque
from datetime import UTC, datetime
from logging import getLogger
from typing import TYPE_CHECKING, Final

from cachetools import LRUCache

from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
from crawlee.storage_clients.models import (
AddRequestsResponse,
ProcessedRequest,
RequestQueueMetadata,
UnprocessedRequest,
)

from ._utils import to_crawlee_request, unique_key_to_request_id

Expand Down Expand Up @@ -90,6 +96,14 @@ def __init__(
Tracked locally to accurately determine when the queue is empty for this single consumer.
"""

self._requests_being_added: dict[str, asyncio.Future[bool]] = {}
"""In-flight `add_batch_of_requests` markers, keyed by request ID.

Each future resolves once the platform call that is adding the request settles: `True` if the request was
committed, `False` otherwise. Concurrent producers of the same request await it instead of re-sending,
which preserves deduplication while still avoiding false success when the original add fails.
"""

self._initialized_caches = False
"""Flag indicating whether local caches have been populated from existing queue contents.

Expand All @@ -108,8 +122,12 @@ async def add_batch_of_requests(
await self._init_caches()
self._initialized_caches = True

loop = asyncio.get_running_loop()
new_requests: list[Request] = []
already_present_requests: list[ProcessedRequest] = []
# Requests a concurrent `add_batch_of_requests` call is already sending. We await its outcome instead of
# re-sending them, as (request, that call's in-flight future) pairs.
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]] = []

for request in requests:
# Calculate id for request
Expand All @@ -135,40 +153,62 @@ async def add_batch_of_requests(
was_already_handled=request.was_already_handled,
)
)
# Check if a concurrent call is already adding this request, and await its outcome rather than
# re-sending it.
elif request_id in self._requests_being_added:
awaited_in_flight.append((request, self._requests_being_added[request_id]))
else:
# Push the request to the platform. Probably not there, or we are not aware of it
# Push the request to the platform. Probably not there, or we are not aware of it. Register an
# in-flight marker so concurrent producers dedupe against it; caching is deferred until the
# platform confirms the request was accepted (see below).
new_requests.append(request)

# Update local caches
self._requests_cache[request_id] = request
if forefront:
self._head_requests.append(request_id)
else:
self._head_requests.appendleft(request_id)
self._requests_being_added[request_id] = loop.create_future()

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [request.model_dump(by_alias=True) for request in new_requests]

# Send requests to API.
batch_response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(request_id, None)
committed_request_ids: set[str] = set()
try:
# Send requests to API.
batch_response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Commit only the requests the platform actually accepted to the local caches. Caching after the
# call succeeds (not before) keeps a failed call from poisoning the cache and silently
# deduplicating a later retry of the same request.
unprocessed_unique_keys = {request.unique_key for request in api_response.unprocessed_requests}
for request in new_requests:
if request.unique_key in unprocessed_unique_keys:
continue
request_id = unique_key_to_request_id(request.unique_key)
self._requests_cache[request_id] = request
if forefront:
self._head_requests.append(request_id)
else:
self._head_requests.appendleft(request_id)
committed_request_ids.add(request_id)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
finally:
# Release the in-flight markers we registered. Committed requests tell concurrent producers the
# request reached the platform; everything else (unprocessed, API error, cancellation) tells them
# it did not, so they retry instead of reporting false success.
for request in new_requests:
request_id = unique_key_to_request_id(request.unique_key)
self._settle_pending_addition(request_id, committed=request_id in committed_request_ids)

else:
api_response = AddRequestsResponse(
unprocessed_requests=[],
processed_requests=already_present_requests,
)

# Fold in requests a concurrent call was already adding.
await self._resolve_awaited_in_flight(awaited_in_flight, api_response)

# Update assumed total count for newly added requests.
new_request_count = 0
for processed_request in api_response.processed_requests:
Expand All @@ -179,6 +219,42 @@ async def add_batch_of_requests(

return api_response

def _settle_pending_addition(self, request_id: str, *, committed: bool) -> None:
"""Resolve the in-flight add marker for a request, unblocking any concurrent producers awaiting it.

Args:
request_id: ID of the request whose in-flight add has settled.
committed: Whether the request was committed to the platform.
"""
future = self._requests_being_added.pop(request_id, None)
if future is not None and not future.done():
future.set_result(committed)

@staticmethod
async def _resolve_awaited_in_flight(
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]],
api_response: AddRequestsResponse,
) -> None:
"""Await concurrent in-flight adds of these requests and fold the outcome into `api_response`.

Requests the concurrent add committed are reported as already present; the rest are reported unprocessed
so the caller retries them rather than receiving false success.
"""
for request, future in awaited_in_flight:
if await future:
api_response.processed_requests.append(
ProcessedRequest(
id=unique_key_to_request_id(request.unique_key),
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
)
else:
api_response.unprocessed_requests.append(
UnprocessedRequest(unique_key=request.unique_key, url=request.url, method=request.method)
)

async def get_request(self, unique_key: str) -> Request | None:
"""Specific implementation of this method for the RQ single access mode."""
return await self._get_request_by_id(id=unique_key_to_request_id(unique_key))
Expand Down
Loading
Loading