diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index ced4c5d2c..627b34d7b 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import asyncio import weakref from typing import cast, override @@ -22,6 +23,28 @@ logger = make_logger(__name__) +_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_SPAN_START" + + +def _skip_span_start_enabled() -> bool: + """Whether to skip the span-start upsert and write each span only on end. + + Tracing writes each span twice — once on start (no ``end_time``) and once + on end. The start row is only ever overwritten by the end write moments + later, so persisting it doubles span-ingest write volume and, on the SGP + backend, costs a non-HOT UPDATE (tsvector/GIN recompute + index churn) plus + a dead tuple per span. Skipping the start makes the end write a single + INSERT. + + Default ON. Set ``AGENTEX_TRACING_SKIP_SPAN_START`` to + ``0``/``false``/``no``/``off`` to restore the start write — e.g. if you + need in-flight spans visible before they complete, or spans that never end + (process crash) to still be persisted. + """ + raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower() + return raw not in ("0", "false", "no", "off") + + def _get_span_type(span: Span) -> str: """Read span_type from span.data['__span_type__'], defaulting to STANDALONE.""" if isinstance(span.data, dict): @@ -75,9 +98,18 @@ def __init__(self, config: SGPTracingProcessorConfig): disabled=disabled, ) self.env_vars = EnvironmentVariables.refresh() + logger.info( + "SGP tracing span-start upsert %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) @override def on_span_start(self, span: Span) -> None: + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return sgp_span = _build_sgp_span(span, self.env_vars) sgp_span.flush(blocking=False) @@ -107,6 +139,11 @@ def __init__(self, config: SGPTracingProcessorConfig): asyncio.AbstractEventLoop, AsyncSGPClient ] = weakref.WeakKeyDictionary() self.env_vars = EnvironmentVariables.refresh() + logger.info( + "SGP tracing span-start upsert %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) def _build_client(self) -> AsyncSGPClient: import httpx @@ -150,6 +187,10 @@ async def on_span_end(self, span: Span) -> None: @override async def on_spans_start(self, spans: list[Span]) -> None: + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return if not spans: return diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 41efcea5a..dc8bab127 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -5,6 +5,8 @@ from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock, patch +import pytest + from agentex.types.span import Span from agentex.lib.types.tracing import SGPTracingProcessorConfig @@ -65,8 +67,9 @@ def test_processor_holds_no_per_span_state(self): processor, _ = self._make_processor() assert not hasattr(processor, "_spans") - def test_span_lifecycle_produces_two_flushes(self): - """Each span produces one flush on start and one on end.""" + def test_span_lifecycle_produces_two_flushes(self, monkeypatch): + """With start writes enabled, each span produces one flush on start and one on end.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _ = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs: @@ -105,6 +108,38 @@ def capture_create_span(**kwargs): assert captured_spans[0].start_time is not None assert captured_spans[0].end_time is not None + def test_span_start_skipped_by_default(self, monkeypatch): + """Default (end-only): on_span_start is a no-op; only on_span_end writes.""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs: + span = _make_span() + processor.on_span_start(span) + assert mock_cs.call_count == 0 # start skipped — nothing built or flushed + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert mock_cs.call_count == 1 # only the end write + + def test_span_start_emitted_when_skip_disabled(self, monkeypatch): + """With skip disabled, on_span_start builds and flushes a span.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") + processor, _ = self._make_processor() + + captured: list[MagicMock] = [] + + def capture(**kwargs): + sgp_span = _make_mock_sgp_span() + captured.append(sgp_span) + return sgp_span + + with patch(f"{MODULE}.create_span", side_effect=capture): + processor.on_span_start(_make_span()) + + assert len(captured) == 1 + assert captured[0].flush.called + # --------------------------------------------------------------------------- # Async processor tests @@ -141,8 +176,9 @@ def test_processor_holds_no_per_span_state(self): processor, _, _ = self._make_processor() assert not hasattr(processor, "_spans") - async def test_span_lifecycle_produces_two_upserts(self): - """Each span produces one upsert_batch call on start and one on end.""" + async def test_span_lifecycle_produces_two_upserts(self, monkeypatch): + """With start writes enabled, each span produces one upsert on start and one on end.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): @@ -153,6 +189,31 @@ async def test_span_lifecycle_produces_two_upserts(self): assert mock_client.spans.upsert_batch.call_count == 2 + async def test_spans_start_skipped_by_default(self, monkeypatch): + """Default (end-only): on_spans_start makes no upsert; on_spans_end does.""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + processor, _, mock_client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + spans = [_make_span() for _ in range(3)] + await processor.on_spans_start(spans) + assert mock_client.spans.upsert_batch.call_count == 0 # start skipped + for s in spans: + s.end_time = datetime.now(UTC) + await processor.on_spans_end(spans) + + assert mock_client.spans.upsert_batch.call_count == 1 # only the end write + + async def test_spans_start_emitted_when_skip_disabled(self, monkeypatch): + """With skip disabled, on_spans_start makes one upsert_batch call.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") + processor, _, mock_client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + await processor.on_spans_start([_make_span()]) + + assert mock_client.spans.upsert_batch.call_count == 1 + async def test_span_end_without_prior_start_still_upserts(self): """Cross-pod Temporal case: END activity lands on a pod that never saw START. @@ -171,8 +232,9 @@ async def test_span_end_without_prior_start_still_upserts(self): items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == 1 - async def test_sgp_span_input_and_output_propagated_on_end(self): + async def test_sgp_span_input_and_output_propagated_on_end(self, monkeypatch): """on_span_end should send the span's current input and output via upsert_batch.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() captured: list[MagicMock] = [] @@ -207,8 +269,9 @@ def capture_create_span(**kwargs): assert end_call_kwargs["input"]["messages"][-1]["role"] == "assistant" assert end_call_kwargs["output"] == {"response": "hi"} - async def test_on_spans_start_sends_single_upsert_for_batch(self): + async def test_on_spans_start_sends_single_upsert_for_batch(self, monkeypatch): """Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() n = 10 @@ -224,6 +287,7 @@ async def test_on_spans_start_sends_single_upsert_for_batch(self): async def test_on_spans_start_records_export_success_metrics(self, monkeypatch): monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") import agentex.lib.core.observability.tracing_metrics_recording as recording recording._metrics_enabled = None @@ -400,3 +464,33 @@ async def test_on_spans_end_sends_single_upsert_for_batch(self): ) items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n + + +# --------------------------------------------------------------------------- +# AGENTEX_TRACING_SKIP_SPAN_START env parsing +# --------------------------------------------------------------------------- + + +class TestSkipSpanStartEnv: + @staticmethod + def _fn(): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + _skip_span_start_enabled, + ) + + return _skip_span_start_enabled + + def test_default_is_skip_enabled(self, monkeypatch): + """Unset → skip span-start (end-only ingest is the default).""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + assert self._fn()() is True + + @pytest.mark.parametrize("val", ["0", "false", "no", "off", "FALSE", "Off", " no "]) + def test_falsy_values_restore_span_start(self, monkeypatch, val): + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val) + assert self._fn()() is False + + @pytest.mark.parametrize("val", ["1", "true", "yes", "on", "anything"]) + def test_other_values_keep_skip_enabled(self, monkeypatch, val): + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val) + assert self._fn()() is True