Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/apify/scrapy/_async_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ def run_coro(
try:
# Wait for the coroutine's result until the specified timeout.
return future.result(timeout=timeout.total_seconds())
except futures.TimeoutError as exc:
logger.exception('Coroutine execution timed out.', exc_info=exc)
raise
except Exception as exc:
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
except futures.TimeoutError:
# `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does
# not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so
# this method does not log it itself.
future.cancel()
raise

def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
Expand All @@ -83,9 +83,15 @@ def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
Args:
timeout: The maximum number of seconds to wait for the event loop thread to exit.
"""
# A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise
# `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close
# is a no-op.
if self._eventloop.is_closed():
return

if self._eventloop.is_running():
# Cancel all pending tasks in the event loop.
self.run_coro(self._shutdown_tasks())
# Cancel all pending tasks in the event loop, honouring the caller's timeout.
self.run_coro(self._shutdown_tasks(), timeout=timeout)

# Schedule the event loop to stop.
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
Expand Down
103 changes: 63 additions & 40 deletions src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io
import re
import struct
import traceback
from datetime import timedelta
from logging import getLogger
from time import time
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -38,6 +40,8 @@ def __init__(self, settings: BaseSettings) -> None:
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60))
"""Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds."""
self._spider: Spider | None = None
self._kvs: KeyValueStore | None = None
self._fingerprinter: RequestFingerprinterProtocol | None = None
Expand All @@ -62,55 +66,66 @@ async def open_kvs() -> KeyValueStore:
return await KeyValueStore.open(name=kvs_name)

logger.debug("Starting background thread for cache storage's event loop")
self._async_thread = AsyncThread()
self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout)
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
self._kvs = self._async_thread.run_coro(open_kvs())
try:
self._kvs = self._async_thread.run_coro(open_kvs())
except Exception:
traceback.print_exc()
raise

def close_spider(self, _: Spider, current_time: int | None = None) -> None:
"""Close the cache storage for a spider."""
if self._async_thread is None:
raise ValueError('Async thread not initialized')

logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
# The cleanup sweep runs inside `try` so a failure there cannot skip closing the async thread
# (which would leak its event-loop thread); `close` always runs in the `finally`.
try:
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')

self._async_thread.run_coro(expire_kvs())

logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')

try:
self._async_thread.run_coro(expire_kvs())
except Exception:
traceback.print_exc()
raise
finally:
logger.debug('Cache storage closed')
logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
finally:
logger.debug('Cache storage closed')

def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
"""Retrieve a response from the cache storage."""
Expand All @@ -122,7 +137,11 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
raise ValueError('Request fingerprinter not initialized')

key = self._fingerprinter.fingerprint(request).hex()
value = self._async_thread.run_coro(self._kvs.get_value(key))
try:
value = self._async_thread.run_coro(self._kvs.get_value(key))
except Exception:
traceback.print_exc()
raise

if value is None:
logger.debug('Cache miss', extra={'request': request})
Expand Down Expand Up @@ -169,7 +188,11 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
'body': response.body,
}
value = to_gzip(data)
self._async_thread.run_coro(self._kvs.set_value(key, value))
try:
self._async_thread.run_coro(self._kvs.set_value(key, value))
except Exception:
traceback.print_exc()
raise


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
Expand Down
16 changes: 14 additions & 2 deletions src/apify/scrapy/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import traceback
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING

Expand All @@ -15,6 +16,7 @@
from apify.storages import RequestQueue

if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.http.request import Request
from twisted.internet.defer import Deferred

Expand All @@ -27,7 +29,7 @@ class ApifyScheduler(BaseScheduler):
This scheduler requires the asyncio Twisted reactor to be installed.
"""

def __init__(self) -> None:
def __init__(self, async_thread_timeout: timedelta = timedelta(seconds=60)) -> None:
if not is_asyncio_reactor_installed():
raise ValueError(
f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. '
Expand All @@ -38,7 +40,17 @@ def __init__(self) -> None:
self.spider: Spider | None = None

# A thread with the asyncio event loop to run coroutines on.
self._async_thread = AsyncThread()
self._async_thread = AsyncThread(default_timeout=async_thread_timeout)

@classmethod
def from_crawler(cls, crawler: Crawler) -> ApifyScheduler:
"""Create the scheduler, reading the async-thread timeout from the Scrapy settings.

The `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting (in seconds) caps how long each coroutine run on the
background event loop may take before timing out; it defaults to 60 seconds.
"""
timeout_secs = crawler.settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)
return cls(async_thread_timeout=timedelta(seconds=timeout_secs))

def open(self, spider: Spider) -> Deferred[None] | None:
"""Open the scheduler.
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/scrapy/extensions/test_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import json
import pickle
from datetime import timedelta
from time import time
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, cast
Expand Down Expand Up @@ -274,6 +275,55 @@ def test_close_spider_respects_max_items() -> None:
assert len(kvs.deleted) == 2


def test_close_spider_closes_thread_even_when_cleanup_fails() -> None:
"""If the expiration sweep raises, the async thread is still closed rather than leaked."""
closed: list[bool] = []

class _FailingAsyncThread:
def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
coro.close() # we never run it; just avoid an un-awaited coroutine warning
raise RuntimeError('cleanup boom')

def close(self, *_: Any, **__: Any) -> None:
closed.append(True)

storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': 100}))
storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment]
storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment]

with pytest.raises(RuntimeError, match='cleanup boom'):
storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type]

assert closed == [True]


def test_cache_storage_reads_async_thread_timeout_setting() -> None:
"""`APIFY_ASYNC_THREAD_TIMEOUT_SECS` is read into the storage's async-thread timeout."""
storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77}))
assert storage._async_thread_timeout == timedelta(seconds=77)


def test_open_spider_passes_timeout_to_async_thread(monkeypatch: pytest.MonkeyPatch) -> None:
"""`open_spider` constructs the async thread with the configured timeout."""
captured: dict[str, Any] = {}

class _RecordingAsyncThread:
def __init__(self, default_timeout: timedelta | None = None) -> None:
captured['default_timeout'] = default_timeout

def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
coro.close() # we never run it; just avoid an un-awaited coroutine warning
return _FakeKvs(None)

monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _RecordingAsyncThread)

storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77}))
spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter()))
storage.open_spider(cast('Any', spider))

assert captured['default_timeout'] == timedelta(seconds=77)


@pytest.mark.parametrize(
('spider_name', 'expected'),
[
Expand Down
89 changes: 89 additions & 0 deletions tests/unit/scrapy/test_async_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import asyncio
import logging
import threading
from concurrent import futures
from datetime import timedelta
from typing import Any, Literal

import pytest

from ..._utils import poll_until_condition
from apify.scrapy._async_thread import AsyncThread


def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None:
"""Block until the background event loop is running, so `run_coro` does not race the thread startup."""
if not asyncio.run(poll_until_condition(thread._eventloop.is_running, timeout=timeout, poll_interval=0.01)):
raise AssertionError('The event loop did not start in time.')


def test_run_coro_cancels_the_coroutine_on_timeout() -> None:
"""A timed-out coroutine is cancelled, not left running on the background loop."""
thread = AsyncThread()
_wait_until_running(thread)

started = threading.Event()
cancelled = threading.Event()

async def slow() -> None:
started.set()
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled.set()
raise

with pytest.raises(futures.TimeoutError):
thread.run_coro(slow(), timeout=timedelta(seconds=0.1))

assert started.wait(timeout=2)
assert cancelled.wait(timeout=2), 'the timed-out coroutine was left running instead of being cancelled'

thread.close()


def test_run_coro_does_not_log_on_exception(caplog: pytest.LogCaptureFixture) -> None:
"""`run_coro` propagates a failing coroutine without logging it itself (the caller/Scrapy reports it once)."""
thread = AsyncThread()
_wait_until_running(thread)

async def boom() -> None:
raise RuntimeError('boom')

with caplog.at_level(logging.DEBUG, logger='apify.scrapy._async_thread'), pytest.raises(RuntimeError, match='boom'):
thread.run_coro(boom())

thread.close()

assert [record for record in caplog.records if record.levelno >= logging.ERROR] == []


def test_close_is_idempotent() -> None:
"""Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop."""
thread = AsyncThread()
_wait_until_running(thread)
thread.run_coro(asyncio.sleep(0))

thread.close()
thread.close() # must not raise


def test_close_passes_its_timeout_to_the_shutdown_step(monkeypatch: pytest.MonkeyPatch) -> None:
"""`close(timeout=...)` honours that timeout for the task-cancellation step, not only the thread join."""
thread = AsyncThread()
_wait_until_running(thread)
thread.run_coro(asyncio.sleep(0))

recorded: list[timedelta | str] = []
original = thread.run_coro

def spy(coro: Any, timeout: timedelta | Literal['default'] = 'default') -> Any:
recorded.append(timeout)
return original(coro, timeout=timeout)

monkeypatch.setattr(thread, 'run_coro', spy)
thread.close(timeout=timedelta(seconds=42))

assert recorded == [timedelta(seconds=42)]
Loading
Loading