Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions runtimes/v1/azure_functions_runtime_v1/handle_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ async def invocation_request(request):
fi: FunctionInfo = _functions.get_function(
function_id)
assert fi is not None

# Initialize context and configure OpenTelemetry early
# to ensure all logs have proper trace context (Operation Id)
fi_context = get_context(invoc_request, fi.name,
fi.directory)
if otel_manager.get_azure_monitor_available():
configure_opentelemetry(fi_context)

logger.info("Function name: %s, Function Type: %s",
fi.name,
("async" if fi.is_async else "sync"))
Expand All @@ -174,9 +182,6 @@ async def invocation_request(request):
pb,
trigger_metadata=trigger_metadata)

fi_context = get_context(invoc_request, fi.name,
fi.directory)

# Use local thread storage to store the invocation ID
# for a customer's threads
fi_context.thread_local_storage.invocation_id = invocation_id
Expand All @@ -188,9 +193,6 @@ async def invocation_request(request):
args[name] = Out()

if fi.is_async:
if otel_manager.get_azure_monitor_available():
configure_opentelemetry(fi_context)

# Not supporting Extensions
call_result = await execute_async(fi.func, args)
else:
Expand Down
16 changes: 9 additions & 7 deletions runtimes/v2/azure_functions_runtime/handle_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ async def invocation_request(request):
fi: FunctionInfo = _functions.get_function(
function_id)
assert fi is not None

# Initialize context and configure OpenTelemetry early
# to ensure all logs have proper trace context (Operation Id)
fi_context = get_context(invoc_request, fi.name,
fi.directory)
if (otel_manager.get_azure_monitor_available()
or otel_manager.get_otel_libs_available()):
configure_opentelemetry(fi_context)

logger.info("Function name: %s, Function Type: %s",
fi.name,
("async" if fi.is_async else "sync"))
Expand Down Expand Up @@ -217,9 +226,6 @@ async def invocation_request(request):
await sync_http_request(http_request, func_http_request)
args[trigger_arg_name] = http_request

fi_context = get_context(invoc_request, fi.name,
fi.directory)

# Use local thread storage to store the invocation ID
# for a customer's threads
fi_context.thread_local_storage.invocation_id = invocation_id
Expand All @@ -234,10 +240,6 @@ async def invocation_request(request):
args[name] = Out()

if fi.is_async:
if (otel_manager.get_azure_monitor_available()
or otel_manager.get_otel_libs_available()):
configure_opentelemetry(fi_context)

# Extensions are not supported
call_result = await execute_async(fi.func, args)
else:
Expand Down
13 changes: 7 additions & 6 deletions workers/azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,13 @@ async def _handle__invocation_request(self, request):
function_id)
assert fi is not None

# Initialize context and configure OpenTelemetry early
# to ensure all logs have proper trace context (Operation Id)
fi_context = self._get_context(invoc_request, fi.name,
fi.directory)
if self._azure_monitor_available or self._otel_libs_available:
self.configure_opentelemetry(fi_context)

function_invocation_logs: List[str] = [
'Received FunctionInvocationRequest',
f'request ID: {self.request_id}',
Expand Down Expand Up @@ -659,9 +666,6 @@ async def _handle__invocation_request(self, request):
await sync_http_request(http_request, func_http_request)
args[trigger_arg_name] = http_request

fi_context = self._get_context(invoc_request, fi.name,
fi.directory)

# Use local thread storage to store the invocation ID
# for a customer's threads
fi_context.thread_local_storage.invocation_id = invocation_id
Expand All @@ -676,9 +680,6 @@ async def _handle__invocation_request(self, request):
args[name] = bindings.Out()

if fi.is_async:
if self._azure_monitor_available or self._otel_libs_available:
self.configure_opentelemetry(fi_context)

call_result = \
await self._run_async_func(fi_context, fi.func, args)
else:
Expand Down
192 changes: 192 additions & 0 deletions workers/tests/unittests/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,195 @@ def test_init_request_enable_azure_monitor_disabled_app_setting(
# Verify that WorkerOpenTelemetryEnabled capability is not set
capabilities = init_response.worker_init_response.capabilities
self.assertNotIn("WorkerOpenTelemetryEnabled", capabilities)


class TestOpenTelemetryContextPropagation(unittest.TestCase):
"""Tests to verify OpenTelemetry context is propagated before logging.

Issue #1626: OpenTelemetry context must be configured before the first
log is emitted in _handle__invocation_request to ensure all logs have
proper Operation Id in Application Insights.
"""

def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.dispatcher = testutils.create_dummy_dispatcher()
self.call_order = []

def tearDown(self):
self.loop.close()

def _track_configure_otel(self, *args, **kwargs):
"""Track when configure_opentelemetry is called."""
self.call_order.append('configure_opentelemetry')

def _track_logger_info(self, *args, **kwargs):
"""Track when logger.info is called."""
self.call_order.append('logger_info')

@patch.dict(os.environ, {'PYTHON_ENABLE_OPENTELEMETRY': 'true'})
@patch('azure_functions_worker.dispatcher.logger')
def test_otel_configured_before_first_log_in_invocation_request(
self,
mock_logger,
):
"""Verify configure_opentelemetry is called before the first log.

This test ensures that when OpenTelemetry is enabled, the context
is propagated before any logging occurs, so all logs have proper
trace context (Operation Id).
"""
# Set up tracking for call order
mock_logger.info.side_effect = self._track_logger_info

# Enable OpenTelemetry on the dispatcher
self.dispatcher._otel_libs_available = True
self.dispatcher._azure_monitor_available = False

# Mock configure_opentelemetry to track when it's called
self.dispatcher.configure_opentelemetry = MagicMock(
side_effect=self._track_configure_otel
)

# Mock _get_context to return a valid context
mock_context = MagicMock()
mock_context.trace_context = MagicMock()
mock_context.trace_context.trace_parent = "00-trace-parent"
mock_context.trace_context.trace_state = "state"
mock_context.thread_local_storage = MagicMock()
self.dispatcher._get_context = MagicMock(return_value=mock_context)

# Mock the functions registry
mock_fi = MagicMock()
mock_fi.name = "test_function"
mock_fi.is_async = True
mock_fi.directory = "/test/dir"
mock_fi.input_types = {}
mock_fi.output_types = {}
mock_fi.requires_context = False
mock_fi.has_return = False
mock_fi.settlement_client_arg = None
mock_fi.func = MagicMock(return_value=None)
self.dispatcher._functions = MagicMock()
self.dispatcher._functions.get_function = MagicMock(return_value=mock_fi)

# Create a mock invocation request
invoc_request = protos.StreamingMessage(
invocation_request=protos.InvocationRequest(
invocation_id="test-inv-123",
function_id="test-func-id",
trace_context=protos.RpcTraceContext(
trace_parent="00-trace-parent",
trace_state="state"
)
)
)

# Mock _run_async_func to return None
self.dispatcher._run_async_func = MagicMock(
return_value=asyncio.coroutine(lambda: None)()
)

# Run the invocation request (will fail but we only care about call order)
try:
self.loop.run_until_complete(
self.dispatcher._handle__invocation_request(invoc_request))
except Exception:
# We expect this may fail due to incomplete mocking,
# but we only care about verifying call order
pass

# Verify configure_opentelemetry was called
self.assertIn('configure_opentelemetry', self.call_order,
"configure_opentelemetry should have been called")

# Find the position of configure_opentelemetry and first logger.info
if 'configure_opentelemetry' in self.call_order and \
'logger_info' in self.call_order:
otel_index = self.call_order.index('configure_opentelemetry')
first_log_index = self.call_order.index('logger_info')

self.assertLess(
otel_index, first_log_index,
f"configure_opentelemetry (index {otel_index}) should be called "
f"before the first logger.info (index {first_log_index}). "
f"Call order: {self.call_order}"
)

@patch.dict(os.environ, {'PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY': 'true'})
@patch('azure_functions_worker.dispatcher.logger')
def test_azure_monitor_configured_before_first_log(
self,
mock_logger,
):
"""Verify configure_opentelemetry is called before first log with Azure Monitor."""
# Set up tracking for call order
mock_logger.info.side_effect = self._track_logger_info

# Enable Azure Monitor on the dispatcher
self.dispatcher._otel_libs_available = False
self.dispatcher._azure_monitor_available = True

# Mock configure_opentelemetry to track when it's called
self.dispatcher.configure_opentelemetry = MagicMock(
side_effect=self._track_configure_otel
)

# Mock _get_context to return a valid context
mock_context = MagicMock()
mock_context.trace_context = MagicMock()
mock_context.trace_context.trace_parent = "00-trace-parent"
mock_context.trace_context.trace_state = "state"
mock_context.thread_local_storage = MagicMock()
self.dispatcher._get_context = MagicMock(return_value=mock_context)

# Mock the functions registry
mock_fi = MagicMock()
mock_fi.name = "test_function"
mock_fi.is_async = True
mock_fi.directory = "/test/dir"
mock_fi.input_types = {}
mock_fi.output_types = {}
mock_fi.requires_context = False
mock_fi.has_return = False
mock_fi.settlement_client_arg = None
mock_fi.func = MagicMock(return_value=None)
self.dispatcher._functions = MagicMock()
self.dispatcher._functions.get_function = MagicMock(return_value=mock_fi)

# Create a mock invocation request
invoc_request = protos.StreamingMessage(
invocation_request=protos.InvocationRequest(
invocation_id="test-inv-456",
function_id="test-func-id",
trace_context=protos.RpcTraceContext(
trace_parent="00-trace-parent",
trace_state="state"
)
)
)

# Mock _run_async_func to return None
self.dispatcher._run_async_func = MagicMock(
return_value=asyncio.coroutine(lambda: None)()
)

# Run the invocation request
try:
self.loop.run_until_complete(
self.dispatcher._handle__invocation_request(invoc_request))
except Exception:
pass

# Verify configure_opentelemetry was called before first log
if 'configure_opentelemetry' in self.call_order and \
'logger_info' in self.call_order:
otel_index = self.call_order.index('configure_opentelemetry')
first_log_index = self.call_order.index('logger_info')

self.assertLess(
otel_index, first_log_index,
f"configure_opentelemetry should be called before first log. "
f"Call order: {self.call_order}"
)
Loading