From c0257453d0e676646d3b7f117cd3c06810556667 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 17:12:22 +0200 Subject: [PATCH 1/2] fix(scrapy): async-thread shutdown, duplicate error logs, and timeout setting --- src/apify/scrapy/_async_thread.py | 20 ++-- src/apify/scrapy/extensions/_httpcache.py | 78 ++++++++------- src/apify/scrapy/scheduler.py | 45 ++++----- .../unit/scrapy/extensions/test_httpcache.py | 50 ++++++++++ tests/unit/scrapy/test_async_thread.py | 98 +++++++++++++++++++ tests/unit/scrapy/test_scheduler.py | 36 ++++++- 6 files changed, 256 insertions(+), 71 deletions(-) create mode 100644 tests/unit/scrapy/test_async_thread.py diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 0333531b..a6d93d66 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -67,11 +67,11 @@ def run_coro( try: # Wait for the coroutine's result until the specified timeout. return future.result(timeout=timeout.total_seconds()) - except futures.TimeoutError as exc: - logger.exception('Coroutine execution timed out.', exc_info=exc) - raise - except Exception as exc: - logger.exception('Coroutine execution raised an exception.', exc_info=exc) + except futures.TimeoutError: + # `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does + # not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so + # this method does not log it itself. + future.cancel() raise def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: @@ -83,9 +83,15 @@ def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: Args: timeout: The maximum number of seconds to wait for the event loop thread to exit. """ + # A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise + # `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close + # is a no-op. + if self._eventloop.is_closed(): + return + if self._eventloop.is_running(): - # Cancel all pending tasks in the event loop. - self.run_coro(self._shutdown_tasks()) + # Cancel all pending tasks in the event loop, honouring the caller's timeout. + self.run_coro(self._shutdown_tasks(), timeout=timeout) # Schedule the event loop to stop. self._eventloop.call_soon_threadsafe(self._eventloop.stop) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 0909c583..296216c5 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,6 +4,7 @@ import io import re import struct +from datetime import timedelta from logging import getLogger from time import time from typing import TYPE_CHECKING @@ -38,6 +39,8 @@ def __init__(self, settings: BaseSettings) -> None: # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') + # Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds. + self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) self._spider: Spider | None = None self._kvs: KeyValueStore | None = None self._fingerprinter: RequestFingerprinterProtocol | None = None @@ -62,7 +65,7 @@ async def open_kvs() -> KeyValueStore: return await KeyValueStore.open(name=kvs_name) logger.debug("Starting background thread for cache storage's event loop") - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) logger.debug(f"Opening cache storage's {kvs_name!r} key value store") self._kvs = self._async_thread.run_coro(open_kvs()) @@ -72,45 +75,48 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: raise ValueError('Async thread not initialized') logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') - if self._expiration_secs > 0: - if current_time is None: - current_time = int(time()) - - async def expire_kvs() -> None: - if self._kvs is None: - raise ValueError('Key value store not initialized') - # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, - # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats - # an expired entry as a cache miss. - processed = 0 - async for item in self._kvs.iterate_keys(): - if processed >= self._expiration_max_items: - break - processed += 1 - value = await self._kvs.get_value(item.key) - try: - gzip_time = read_gzip_time(value) - except Exception as e: - logger.warning(f'Malformed cache item {item.key}: {e}') - await self._kvs.delete_value(item.key) - else: - if self._expiration_secs < current_time - gzip_time: - logger.debug(f'Expired cache item {item.key}') + # The cleanup sweep runs inside `try` so a failure there cannot skip closing the async thread + # (which would leak its event-loop thread); `close` always runs in the `finally`. + try: + if self._expiration_secs > 0: + if current_time is None: + current_time = int(time()) + + async def expire_kvs() -> None: + if self._kvs is None: + raise ValueError('Key value store not initialized') + # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, + # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats + # an expired entry as a cache miss. + processed = 0 + async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 + value = await self._kvs.get_value(item.key) + try: + gzip_time = read_gzip_time(value) + except Exception as e: + logger.warning(f'Malformed cache item {item.key}: {e}') await self._kvs.delete_value(item.key) else: - logger.debug(f'Valid cache item {item.key}') - - self._async_thread.run_coro(expire_kvs()) + if self._expiration_secs < current_time - gzip_time: + logger.debug(f'Expired cache item {item.key}') + await self._kvs.delete_value(item.key) + else: + logger.debug(f'Valid cache item {item.key}') - logger.debug('Closing cache storage') - try: - self._async_thread.close() - except KeyboardInterrupt: - logger.warning('Shutdown interrupted by KeyboardInterrupt!') - except Exception: - logger.exception('Exception occurred while shutting down cache storage') + self._async_thread.run_coro(expire_kvs()) finally: - logger.debug('Cache storage closed') + logger.debug('Closing cache storage') + try: + self._async_thread.close() + except KeyboardInterrupt: + logger.warning('Shutdown interrupted by KeyboardInterrupt!') + except Exception: + logger.exception('Exception occurred while shutting down cache storage') + finally: + logger.debug('Cache storage closed') def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None: """Retrieve a response from the cache storage.""" diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 2b95d30c..d339c856 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,6 +1,6 @@ from __future__ import annotations -import traceback +from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -15,6 +15,7 @@ from apify.storages import RequestQueue if TYPE_CHECKING: + from scrapy.crawler import Crawler from scrapy.http.request import Request from twisted.internet.defer import Deferred @@ -27,7 +28,7 @@ class ApifyScheduler(BaseScheduler): This scheduler requires the asyncio Twisted reactor to be installed. """ - def __init__(self) -> None: + def __init__(self, async_thread_timeout: timedelta = timedelta(seconds=60)) -> None: if not is_asyncio_reactor_installed(): raise ValueError( f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. ' @@ -38,7 +39,17 @@ def __init__(self) -> None: self.spider: Spider | None = None # A thread with the asyncio event loop to run coroutines on. - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=async_thread_timeout) + + @classmethod + def from_crawler(cls, crawler: Crawler) -> ApifyScheduler: + """Create the scheduler, reading the async-thread timeout from the Scrapy settings. + + The `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting (in seconds) caps how long each coroutine run on the + background event loop may take before timing out; it defaults to 60 seconds. + """ + timeout_secs = crawler.settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60) + return cls(async_thread_timeout=timedelta(seconds=timeout_secs)) def open(self, spider: Spider) -> Deferred[None] | None: """Open the scheduler. @@ -62,7 +73,6 @@ async def open_rq() -> RequestQueue: self._rq = self._async_thread.run_coro(open_rq()) except Exception: self._async_thread.close() - traceback.print_exc() raise return None @@ -97,12 +107,7 @@ def has_pending_requests(self) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - is_finished = self._async_thread.run_coro(self._rq.is_finished()) - except Exception: - traceback.print_exc() - raise - + is_finished = self._async_thread.run_coro(self._rq.is_finished()) return not is_finished def enqueue_request(self, request: Request) -> bool: @@ -130,12 +135,7 @@ def enqueue_request(self, request: Request) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - result = self._async_thread.run_coro(self._rq.add_request(apify_request)) - except Exception: - traceback.print_exc() - raise - + result = self._async_thread.run_coro(self._rq.add_request(apify_request)) logger.debug(f'rq.add_request result: {result}') return not bool(result.was_already_present) @@ -149,12 +149,7 @@ def next_request(self) -> Request | None: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) - except Exception: - traceback.print_exc() - raise - + apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) logger.debug(f'Fetched apify_request: {apify_request}') if apify_request is None: return None @@ -173,11 +168,7 @@ def next_request(self) -> Request | None: # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back # forever. Retrying genuine failures is the RetryMiddleware's job. - try: - self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) - except Exception: - traceback.print_exc() - raise + self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) if scrapy_request is None: return None diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 6f34853b..b7db7910 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -5,6 +5,7 @@ import io import json import pickle +from datetime import timedelta from time import time from types import SimpleNamespace from typing import TYPE_CHECKING, Any, cast @@ -274,6 +275,55 @@ def test_close_spider_respects_max_items() -> None: assert len(kvs.deleted) == 2 +def test_close_spider_closes_thread_even_when_cleanup_fails() -> None: + """If the expiration sweep raises, the async thread is still closed rather than leaked.""" + closed: list[bool] = [] + + class _FailingAsyncThread: + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + raise RuntimeError('cleanup boom') + + def close(self, *_: Any, **__: Any) -> None: + closed.append(True) + + storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': 100})) + storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment] + storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment] + + with pytest.raises(RuntimeError, match='cleanup boom'): + storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type] + + assert closed == [True] + + +def test_cache_storage_reads_async_thread_timeout_setting() -> None: + """`APIFY_ASYNC_THREAD_TIMEOUT_SECS` is read into the storage's async-thread timeout.""" + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + assert storage._async_thread_timeout == timedelta(seconds=77) + + +def test_open_spider_passes_timeout_to_async_thread(monkeypatch: pytest.MonkeyPatch) -> None: + """`open_spider` constructs the async thread with the configured timeout.""" + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + return _FakeKvs(None) + + monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _RecordingAsyncThread) + + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter())) + storage.open_spider(cast('Any', spider)) + + assert captured['default_timeout'] == timedelta(seconds=77) + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py new file mode 100644 index 00000000..d624f465 --- /dev/null +++ b/tests/unit/scrapy/test_async_thread.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from concurrent import futures +from datetime import timedelta +from typing import Any, Literal + +import pytest + +from apify.scrapy._async_thread import AsyncThread + + +def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None: + """Block until the background event loop is running, so `run_coro` does not race the thread startup.""" + deadline = time.monotonic() + timeout + while not thread._eventloop.is_running(): + if time.monotonic() > deadline: + raise AssertionError('The event loop did not start in time.') + time.sleep(0.01) + + +# Coroutine execution + + +def test_run_coro_cancels_the_coroutine_on_timeout() -> None: + """A timed-out coroutine is cancelled, not left running on the background loop.""" + thread = AsyncThread() + _wait_until_running(thread) + + started = threading.Event() + cancelled = threading.Event() + + async def slow() -> None: + started.set() + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + cancelled.set() + raise + + with pytest.raises(futures.TimeoutError): + thread.run_coro(slow(), timeout=timedelta(seconds=0.1)) + + assert started.wait(timeout=2) + assert cancelled.wait(timeout=2), 'the timed-out coroutine was left running instead of being cancelled' + + thread.close() + + +def test_run_coro_does_not_log_on_exception(caplog: pytest.LogCaptureFixture) -> None: + """`run_coro` propagates a failing coroutine without logging it itself (the caller/Scrapy reports it once).""" + thread = AsyncThread() + _wait_until_running(thread) + + async def boom() -> None: + raise RuntimeError('boom') + + with caplog.at_level(logging.DEBUG, logger='apify.scrapy._async_thread'), pytest.raises(RuntimeError, match='boom'): + thread.run_coro(boom()) + + thread.close() + + assert [record for record in caplog.records if record.levelno >= logging.ERROR] == [] + + +# Shutdown + + +def test_close_is_idempotent() -> None: + """Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + thread.close() + thread.close() # must not raise + + +def test_close_passes_its_timeout_to_the_shutdown_step(monkeypatch: pytest.MonkeyPatch) -> None: + """`close(timeout=...)` honours that timeout for the task-cancellation step, not only the thread join.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + recorded: list[timedelta | str] = [] + original = thread.run_coro + + def spy(coro: Any, timeout: timedelta | Literal['default'] = 'default') -> Any: + recorded.append(timeout) + return original(coro, timeout=timeout) + + monkeypatch.setattr(thread, 'run_coro', spy) + thread.close(timeout=timedelta(seconds=42)) + + assert recorded == [timedelta(seconds=42)] diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index c4a87622..7a6926e1 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -1,12 +1,14 @@ from __future__ import annotations import logging +from datetime import timedelta from types import SimpleNamespace -from typing import cast +from typing import Any, cast from unittest import mock import pytest from scrapy import Request, Spider +from scrapy.settings import Settings from apify import Request as ApifyRequest from apify.scrapy.scheduler import ApifyScheduler @@ -151,3 +153,35 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - assert result is None rq.mark_request_as_handled.assert_not_called() + + +def test_next_request_does_not_print_traceback_to_stderr( + scheduler: ApifyScheduler, + capsys: pytest.CaptureFixture[str], +) -> None: + """A failure propagates as-is, without `traceback.print_exc()` printing a second copy past the log formatter.""" + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.side_effect = RuntimeError('boom') + + with pytest.raises(RuntimeError, match='boom'): + scheduler.next_request() + + assert capsys.readouterr().err == '' + + +def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: + """`from_crawler` wires the `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting into the async thread's timeout.""" + monkeypatch.setattr('apify.scrapy.scheduler.is_asyncio_reactor_installed', lambda: True) + + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + monkeypatch.setattr('apify.scrapy.scheduler.AsyncThread', _RecordingAsyncThread) + + crawler = SimpleNamespace(settings=Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 123})) + ApifyScheduler.from_crawler(cast('Any', crawler)) + + assert captured['default_timeout'] == timedelta(seconds=123) From 50aa5396e872304a8a73dbe40261ceae50f762d5 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 19:10:08 +0200 Subject: [PATCH 2/2] fix(scrapy): keep traceback.print_exc() on background-loop coroutine errors --- src/apify/scrapy/extensions/_httpcache.py | 27 +++++++++++++++++---- src/apify/scrapy/scheduler.py | 29 +++++++++++++++++++---- tests/unit/scrapy/test_async_thread.py | 15 +++--------- tests/unit/scrapy/test_scheduler.py | 8 ++++--- 4 files changed, 55 insertions(+), 24 deletions(-) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 296216c5..1c109867 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,6 +4,7 @@ import io import re import struct +import traceback from datetime import timedelta from logging import getLogger from time import time @@ -39,8 +40,8 @@ def __init__(self, settings: BaseSettings) -> None: # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') - # Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds. self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) + """Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds.""" self._spider: Spider | None = None self._kvs: KeyValueStore | None = None self._fingerprinter: RequestFingerprinterProtocol | None = None @@ -67,7 +68,11 @@ async def open_kvs() -> KeyValueStore: logger.debug("Starting background thread for cache storage's event loop") self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) logger.debug(f"Opening cache storage's {kvs_name!r} key value store") - self._kvs = self._async_thread.run_coro(open_kvs()) + try: + self._kvs = self._async_thread.run_coro(open_kvs()) + except Exception: + traceback.print_exc() + raise def close_spider(self, _: Spider, current_time: int | None = None) -> None: """Close the cache storage for a spider.""" @@ -106,7 +111,11 @@ async def expire_kvs() -> None: else: logger.debug(f'Valid cache item {item.key}') - self._async_thread.run_coro(expire_kvs()) + try: + self._async_thread.run_coro(expire_kvs()) + except Exception: + traceback.print_exc() + raise finally: logger.debug('Closing cache storage') try: @@ -128,7 +137,11 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non raise ValueError('Request fingerprinter not initialized') key = self._fingerprinter.fingerprint(request).hex() - value = self._async_thread.run_coro(self._kvs.get_value(key)) + try: + value = self._async_thread.run_coro(self._kvs.get_value(key)) + except Exception: + traceback.print_exc() + raise if value is None: logger.debug('Cache miss', extra={'request': request}) @@ -175,7 +188,11 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non 'body': response.body, } value = to_gzip(data) - self._async_thread.run_coro(self._kvs.set_value(key, value)) + try: + self._async_thread.run_coro(self._kvs.set_value(key, value)) + except Exception: + traceback.print_exc() + raise def to_gzip(data: dict, mtime: int | None = None) -> bytes: diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index d339c856..69b9b8cf 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import traceback from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -73,6 +74,7 @@ async def open_rq() -> RequestQueue: self._rq = self._async_thread.run_coro(open_rq()) except Exception: self._async_thread.close() + traceback.print_exc() raise return None @@ -107,7 +109,12 @@ def has_pending_requests(self) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - is_finished = self._async_thread.run_coro(self._rq.is_finished()) + try: + is_finished = self._async_thread.run_coro(self._rq.is_finished()) + except Exception: + traceback.print_exc() + raise + return not is_finished def enqueue_request(self, request: Request) -> bool: @@ -135,7 +142,12 @@ def enqueue_request(self, request: Request) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - result = self._async_thread.run_coro(self._rq.add_request(apify_request)) + try: + result = self._async_thread.run_coro(self._rq.add_request(apify_request)) + except Exception: + traceback.print_exc() + raise + logger.debug(f'rq.add_request result: {result}') return not bool(result.was_already_present) @@ -149,7 +161,12 @@ def next_request(self) -> Request | None: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) + try: + apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) + except Exception: + traceback.print_exc() + raise + logger.debug(f'Fetched apify_request: {apify_request}') if apify_request is None: return None @@ -168,7 +185,11 @@ def next_request(self) -> Request | None: # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back # forever. Retrying genuine failures is the RetryMiddleware's job. - self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) + try: + self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) + except Exception: + traceback.print_exc() + raise if scrapy_request is None: return None diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py index d624f465..409a8e42 100644 --- a/tests/unit/scrapy/test_async_thread.py +++ b/tests/unit/scrapy/test_async_thread.py @@ -3,26 +3,20 @@ import asyncio import logging import threading -import time from concurrent import futures from datetime import timedelta from typing import Any, Literal import pytest +from ..._utils import poll_until_condition from apify.scrapy._async_thread import AsyncThread def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None: """Block until the background event loop is running, so `run_coro` does not race the thread startup.""" - deadline = time.monotonic() + timeout - while not thread._eventloop.is_running(): - if time.monotonic() > deadline: - raise AssertionError('The event loop did not start in time.') - time.sleep(0.01) - - -# Coroutine execution + if not asyncio.run(poll_until_condition(thread._eventloop.is_running, timeout=timeout, poll_interval=0.01)): + raise AssertionError('The event loop did not start in time.') def test_run_coro_cancels_the_coroutine_on_timeout() -> None: @@ -66,9 +60,6 @@ async def boom() -> None: assert [record for record in caplog.records if record.levelno >= logging.ERROR] == [] -# Shutdown - - def test_close_is_idempotent() -> None: """Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop.""" thread = AsyncThread() diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index 7a6926e1..d85dd76a 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -155,18 +155,20 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - rq.mark_request_as_handled.assert_not_called() -def test_next_request_does_not_print_traceback_to_stderr( +def test_next_request_prints_traceback_to_stderr( scheduler: ApifyScheduler, capsys: pytest.CaptureFixture[str], ) -> None: - """A failure propagates as-is, without `traceback.print_exc()` printing a second copy past the log formatter.""" + """A failure in the coroutine run prints a traceback to stderr via `traceback.print_exc()` before propagating.""" async_thread = cast('mock.MagicMock', scheduler._async_thread) async_thread.run_coro.side_effect = RuntimeError('boom') with pytest.raises(RuntimeError, match='boom'): scheduler.next_request() - assert capsys.readouterr().err == '' + captured = capsys.readouterr() + assert 'Traceback (most recent call last)' in captured.err + assert 'RuntimeError: boom' in captured.err def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: