From 33124bbe79565fdfa0399826a2db4ab1ee509046 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Fri, 26 Jun 2026 13:38:12 -0500 Subject: [PATCH] Move otel context propagation --- .../handle_event.py | 14 +- .../azure_functions_runtime/handle_event.py | 16 +- workers/azure_functions_worker/dispatcher.py | 13 +- workers/tests/unittests/test_opentelemetry.py | 192 ++++++++++++++++++ 4 files changed, 216 insertions(+), 19 deletions(-) diff --git a/runtimes/v1/azure_functions_runtime_v1/handle_event.py b/runtimes/v1/azure_functions_runtime_v1/handle_event.py index da3b39e2f..db0858a2e 100644 --- a/runtimes/v1/azure_functions_runtime_v1/handle_event.py +++ b/runtimes/v1/azure_functions_runtime_v1/handle_event.py @@ -156,6 +156,14 @@ async def invocation_request(request): fi: FunctionInfo = _functions.get_function( function_id) assert fi is not None + + # Initialize context and configure OpenTelemetry early + # to ensure all logs have proper trace context (Operation Id) + fi_context = get_context(invoc_request, fi.name, + fi.directory) + if otel_manager.get_azure_monitor_available(): + configure_opentelemetry(fi_context) + logger.info("Function name: %s, Function Type: %s", fi.name, ("async" if fi.is_async else "sync")) @@ -174,9 +182,6 @@ async def invocation_request(request): pb, trigger_metadata=trigger_metadata) - fi_context = get_context(invoc_request, fi.name, - fi.directory) - # Use local thread storage to store the invocation ID # for a customer's threads fi_context.thread_local_storage.invocation_id = invocation_id @@ -188,9 +193,6 @@ async def invocation_request(request): args[name] = Out() if fi.is_async: - if otel_manager.get_azure_monitor_available(): - configure_opentelemetry(fi_context) - # Not supporting Extensions call_result = await execute_async(fi.func, args) else: diff --git a/runtimes/v2/azure_functions_runtime/handle_event.py b/runtimes/v2/azure_functions_runtime/handle_event.py index 0ed133f93..677e1b130 100644 --- a/runtimes/v2/azure_functions_runtime/handle_event.py +++ b/runtimes/v2/azure_functions_runtime/handle_event.py @@ -182,6 +182,15 @@ async def invocation_request(request): fi: FunctionInfo = _functions.get_function( function_id) assert fi is not None + + # Initialize context and configure OpenTelemetry early + # to ensure all logs have proper trace context (Operation Id) + fi_context = get_context(invoc_request, fi.name, + fi.directory) + if (otel_manager.get_azure_monitor_available() + or otel_manager.get_otel_libs_available()): + configure_opentelemetry(fi_context) + logger.info("Function name: %s, Function Type: %s", fi.name, ("async" if fi.is_async else "sync")) @@ -217,9 +226,6 @@ async def invocation_request(request): await sync_http_request(http_request, func_http_request) args[trigger_arg_name] = http_request - fi_context = get_context(invoc_request, fi.name, - fi.directory) - # Use local thread storage to store the invocation ID # for a customer's threads fi_context.thread_local_storage.invocation_id = invocation_id @@ -234,10 +240,6 @@ async def invocation_request(request): args[name] = Out() if fi.is_async: - if (otel_manager.get_azure_monitor_available() - or otel_manager.get_otel_libs_available()): - configure_opentelemetry(fi_context) - # Extensions are not supported call_result = await execute_async(fi.func, args) else: diff --git a/workers/azure_functions_worker/dispatcher.py b/workers/azure_functions_worker/dispatcher.py index fa8be1117..37609e347 100644 --- a/workers/azure_functions_worker/dispatcher.py +++ b/workers/azure_functions_worker/dispatcher.py @@ -611,6 +611,13 @@ async def _handle__invocation_request(self, request): function_id) assert fi is not None + # Initialize context and configure OpenTelemetry early + # to ensure all logs have proper trace context (Operation Id) + fi_context = self._get_context(invoc_request, fi.name, + fi.directory) + if self._azure_monitor_available or self._otel_libs_available: + self.configure_opentelemetry(fi_context) + function_invocation_logs: List[str] = [ 'Received FunctionInvocationRequest', f'request ID: {self.request_id}', @@ -659,9 +666,6 @@ async def _handle__invocation_request(self, request): await sync_http_request(http_request, func_http_request) args[trigger_arg_name] = http_request - fi_context = self._get_context(invoc_request, fi.name, - fi.directory) - # Use local thread storage to store the invocation ID # for a customer's threads fi_context.thread_local_storage.invocation_id = invocation_id @@ -676,9 +680,6 @@ async def _handle__invocation_request(self, request): args[name] = bindings.Out() if fi.is_async: - if self._azure_monitor_available or self._otel_libs_available: - self.configure_opentelemetry(fi_context) - call_result = \ await self._run_async_func(fi_context, fi.func, args) else: diff --git a/workers/tests/unittests/test_opentelemetry.py b/workers/tests/unittests/test_opentelemetry.py index bdbedc874..925d04eff 100644 --- a/workers/tests/unittests/test_opentelemetry.py +++ b/workers/tests/unittests/test_opentelemetry.py @@ -217,3 +217,195 @@ def test_init_request_enable_azure_monitor_disabled_app_setting( # Verify that WorkerOpenTelemetryEnabled capability is not set capabilities = init_response.worker_init_response.capabilities self.assertNotIn("WorkerOpenTelemetryEnabled", capabilities) + + +class TestOpenTelemetryContextPropagation(unittest.TestCase): + """Tests to verify OpenTelemetry context is propagated before logging. + + Issue #1626: OpenTelemetry context must be configured before the first + log is emitted in _handle__invocation_request to ensure all logs have + proper Operation Id in Application Insights. + """ + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.dispatcher = testutils.create_dummy_dispatcher() + self.call_order = [] + + def tearDown(self): + self.loop.close() + + def _track_configure_otel(self, *args, **kwargs): + """Track when configure_opentelemetry is called.""" + self.call_order.append('configure_opentelemetry') + + def _track_logger_info(self, *args, **kwargs): + """Track when logger.info is called.""" + self.call_order.append('logger_info') + + @patch.dict(os.environ, {'PYTHON_ENABLE_OPENTELEMETRY': 'true'}) + @patch('azure_functions_worker.dispatcher.logger') + def test_otel_configured_before_first_log_in_invocation_request( + self, + mock_logger, + ): + """Verify configure_opentelemetry is called before the first log. + + This test ensures that when OpenTelemetry is enabled, the context + is propagated before any logging occurs, so all logs have proper + trace context (Operation Id). + """ + # Set up tracking for call order + mock_logger.info.side_effect = self._track_logger_info + + # Enable OpenTelemetry on the dispatcher + self.dispatcher._otel_libs_available = True + self.dispatcher._azure_monitor_available = False + + # Mock configure_opentelemetry to track when it's called + self.dispatcher.configure_opentelemetry = MagicMock( + side_effect=self._track_configure_otel + ) + + # Mock _get_context to return a valid context + mock_context = MagicMock() + mock_context.trace_context = MagicMock() + mock_context.trace_context.trace_parent = "00-trace-parent" + mock_context.trace_context.trace_state = "state" + mock_context.thread_local_storage = MagicMock() + self.dispatcher._get_context = MagicMock(return_value=mock_context) + + # Mock the functions registry + mock_fi = MagicMock() + mock_fi.name = "test_function" + mock_fi.is_async = True + mock_fi.directory = "/test/dir" + mock_fi.input_types = {} + mock_fi.output_types = {} + mock_fi.requires_context = False + mock_fi.has_return = False + mock_fi.settlement_client_arg = None + mock_fi.func = MagicMock(return_value=None) + self.dispatcher._functions = MagicMock() + self.dispatcher._functions.get_function = MagicMock(return_value=mock_fi) + + # Create a mock invocation request + invoc_request = protos.StreamingMessage( + invocation_request=protos.InvocationRequest( + invocation_id="test-inv-123", + function_id="test-func-id", + trace_context=protos.RpcTraceContext( + trace_parent="00-trace-parent", + trace_state="state" + ) + ) + ) + + # Mock _run_async_func to return None + self.dispatcher._run_async_func = MagicMock( + return_value=asyncio.coroutine(lambda: None)() + ) + + # Run the invocation request (will fail but we only care about call order) + try: + self.loop.run_until_complete( + self.dispatcher._handle__invocation_request(invoc_request)) + except Exception: + # We expect this may fail due to incomplete mocking, + # but we only care about verifying call order + pass + + # Verify configure_opentelemetry was called + self.assertIn('configure_opentelemetry', self.call_order, + "configure_opentelemetry should have been called") + + # Find the position of configure_opentelemetry and first logger.info + if 'configure_opentelemetry' in self.call_order and \ + 'logger_info' in self.call_order: + otel_index = self.call_order.index('configure_opentelemetry') + first_log_index = self.call_order.index('logger_info') + + self.assertLess( + otel_index, first_log_index, + f"configure_opentelemetry (index {otel_index}) should be called " + f"before the first logger.info (index {first_log_index}). " + f"Call order: {self.call_order}" + ) + + @patch.dict(os.environ, {'PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY': 'true'}) + @patch('azure_functions_worker.dispatcher.logger') + def test_azure_monitor_configured_before_first_log( + self, + mock_logger, + ): + """Verify configure_opentelemetry is called before first log with Azure Monitor.""" + # Set up tracking for call order + mock_logger.info.side_effect = self._track_logger_info + + # Enable Azure Monitor on the dispatcher + self.dispatcher._otel_libs_available = False + self.dispatcher._azure_monitor_available = True + + # Mock configure_opentelemetry to track when it's called + self.dispatcher.configure_opentelemetry = MagicMock( + side_effect=self._track_configure_otel + ) + + # Mock _get_context to return a valid context + mock_context = MagicMock() + mock_context.trace_context = MagicMock() + mock_context.trace_context.trace_parent = "00-trace-parent" + mock_context.trace_context.trace_state = "state" + mock_context.thread_local_storage = MagicMock() + self.dispatcher._get_context = MagicMock(return_value=mock_context) + + # Mock the functions registry + mock_fi = MagicMock() + mock_fi.name = "test_function" + mock_fi.is_async = True + mock_fi.directory = "/test/dir" + mock_fi.input_types = {} + mock_fi.output_types = {} + mock_fi.requires_context = False + mock_fi.has_return = False + mock_fi.settlement_client_arg = None + mock_fi.func = MagicMock(return_value=None) + self.dispatcher._functions = MagicMock() + self.dispatcher._functions.get_function = MagicMock(return_value=mock_fi) + + # Create a mock invocation request + invoc_request = protos.StreamingMessage( + invocation_request=protos.InvocationRequest( + invocation_id="test-inv-456", + function_id="test-func-id", + trace_context=protos.RpcTraceContext( + trace_parent="00-trace-parent", + trace_state="state" + ) + ) + ) + + # Mock _run_async_func to return None + self.dispatcher._run_async_func = MagicMock( + return_value=asyncio.coroutine(lambda: None)() + ) + + # Run the invocation request + try: + self.loop.run_until_complete( + self.dispatcher._handle__invocation_request(invoc_request)) + except Exception: + pass + + # Verify configure_opentelemetry was called before first log + if 'configure_opentelemetry' in self.call_order and \ + 'logger_info' in self.call_order: + otel_index = self.call_order.index('configure_opentelemetry') + first_log_index = self.call_order.index('logger_info') + + self.assertLess( + otel_index, first_log_index, + f"configure_opentelemetry should be called before first log. " + f"Call order: {self.call_order}" + )