From ee62c8f3fac070fc8931fc9515e01ea3778a60d2 Mon Sep 17 00:00:00 2001 From: 444am Date: Thu, 11 Jun 2026 22:37:26 +1000 Subject: [PATCH 1/3] Add OpenTracing interceptor for standalone activities --- .../OpenTracingActivityClientInterceptor.java | 30 ++ .../opentracing/SpanCreationContext.java | 23 +- .../opentracing/SpanOperationType.java | 8 + .../opentracing/StandardTagNames.java | 1 + .../ActionTypeAndNameSpanBuilderProvider.java | 9 + ...TracingActivityClientCallsInterceptor.java | 151 +++++++++ ...racingActivityInboundCallsInterceptor.java | 24 +- .../opentracing/internal/SpanFactory.java | 46 +++ .../StandaloneActivityClientTracingTest.java | 296 ++++++++++++++++++ .../StandaloneActivityWorkerTracingTest.java | 92 ++++++ 10 files changed, 670 insertions(+), 10 deletions(-) create mode 100644 contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java create mode 100644 contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java create mode 100644 contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java create mode 100644 contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java new file mode 100644 index 0000000000..0addec6d5e --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java @@ -0,0 +1,30 @@ +package io.temporal.opentracing; + +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientInterceptorBase; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; + +public class OpenTracingActivityClientInterceptor extends ActivityClientInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientInterceptor() { + this(OpenTracingOptions.getDefaultInstance()); + } + + public OpenTracingActivityClientInterceptor(OpenTracingOptions options) { + this.options = options; + this.spanFactory = new SpanFactory(options); + this.contextAccessor = new ContextAccessor(options); + } + + @Override + public ActivityClientCallsInterceptor activityClientCallsInterceptor( + ActivityClientCallsInterceptor next) { + return new OpenTracingActivityClientCallsInterceptor( + next, options, spanFactory, contextAccessor); + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java index a2756afdde..c419a04976 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java @@ -14,6 +14,7 @@ public class SpanCreationContext { private final String runId; private final String parentWorkflowId; private final String parentRunId; + private final String activityId; private SpanCreationContext( SpanOperationType spanOperationType, @@ -21,13 +22,15 @@ private SpanCreationContext( String workflowId, String runId, String parentWorkflowId, - String parentRunId) { + String parentRunId, + String activityId) { this.spanOperationType = spanOperationType; this.actionName = actionName; this.workflowId = workflowId; this.runId = runId; this.parentWorkflowId = parentWorkflowId; this.parentRunId = parentRunId; + this.activityId = activityId; } public SpanOperationType getSpanOperationType() { @@ -59,6 +62,10 @@ public String getParentRunId() { return parentRunId; } + public @Nullable String getActivityId() { + return activityId; + } + public static Builder newBuilder() { return new Builder(); } @@ -70,6 +77,7 @@ public static final class Builder { private String runId; private String parentWorkflowId; private String parentRunId; + private String activityId; private Builder() {} @@ -103,9 +111,20 @@ public Builder setParentRunId(String parentRunId) { return this; } + public Builder setActivityId(String activityId) { + this.activityId = activityId; + return this; + } + public SpanCreationContext build() { return new SpanCreationContext( - spanOperationType, actionName, workflowId, runId, parentWorkflowId, parentRunId); + spanOperationType, + actionName, + workflowId, + runId, + parentWorkflowId, + parentRunId, + activityId); } } } diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 2f8a27429d..36e8e291bf 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -16,6 +16,14 @@ public enum SpanOperationType { HANDLE_SIGNAL("HandleSignal"), HANDLE_UPDATE("HandleUpdate"), START_NEXUS_OPERATION("StartNexusOperation"), + START_STANDALONE_ACTIVITY("StartStandaloneActivity"), + RUN_STANDALONE_ACTIVITY("RunStandaloneActivity"), + GET_STANDALONE_ACTIVITY_RESULT("GetStandaloneActivityResult"), + DESCRIBE_STANDALONE_ACTIVITY("DescribeStandaloneActivity"), + CANCEL_STANDALONE_ACTIVITY("CancelStandaloneActivity"), + TERMINATE_STANDALONE_ACTIVITY("TerminateStandaloneActivity"), + LIST_STANDALONE_ACTIVITIES("ListStandaloneActivities"), + COUNT_STANDALONE_ACTIVITIES("CountStandaloneActivities"), RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"), RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"); diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java index de9cf3f4ec..d1fb48e479 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java @@ -3,6 +3,7 @@ public class StandardTagNames { public static final String WORKFLOW_ID = "workflowId"; public static final String RUN_ID = "runId"; + public static final String ACTIVITY_ID = "activityId"; public static final String PARENT_WORKFLOW_ID = "parentWorkflowId"; public static final String PARENT_RUN_ID = "parentRunId"; diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 1734f3a36d..d0978e457b 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -84,6 +84,15 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case START_STANDALONE_ACTIVITY: + case RUN_STANDALONE_ACTIVITY: + case GET_STANDALONE_ACTIVITY_RESULT: + case DESCRIBE_STANDALONE_ACTIVITY: + case CANCEL_STANDALONE_ACTIVITY: + case TERMINATE_STANDALONE_ACTIVITY: + return ImmutableMap.of(StandardTagNames.ACTIVITY_ID, context.getActivityId()); + case LIST_STANDALONE_ACTIVITIES: + case COUNT_STANDALONE_ACTIVITIES: case RUN_START_NEXUS_OPERATION: case RUN_CANCEL_NEXUS_OPERATION: case HANDLE_QUERY: diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java new file mode 100644 index 0000000000..b66e37faa5 --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java @@ -0,0 +1,151 @@ +package io.temporal.opentracing.internal; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.opentracing.SpanOperationType; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +public class OpenTracingActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase { + private final SpanFactory spanFactory; + private final Tracer tracer; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientCallsInterceptor( + ActivityClientCallsInterceptor next, + OpenTracingOptions options, + SpanFactory spanFactory, + ContextAccessor contextAccessor) { + super(next); + this.spanFactory = spanFactory; + this.tracer = options.getTracer(); + this.contextAccessor = contextAccessor; + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createStandaloneActivityStartSpan( + tracer, input.getActivityType(), input.getOptions().getId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) { + return super.startActivity(input); + } finally { + activityStartSpan.finish(); + } + } + + @Override + public GetActivityResultOutput getActivityResult(GetActivityResultInput input) + throws TimeoutException { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.getActivityResult(input); + } finally { + span.finish(); + } + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.getActivityResultAsync(input) + .whenComplete( + (result, throwable) -> { + span.finish(); + }); + } catch (Throwable t) { + span.finish(); + throw t; + } + } + + @Override + public DescribeActivityOutput describeActivity(DescribeActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.DESCRIBE_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.describeActivity(input); + } finally { + span.finish(); + } + } + + @Override + public CancelActivityOutput cancelActivity(CancelActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.CANCEL_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.cancelActivity(input); + } finally { + span.finish(); + } + } + + @Override + public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.TERMINATE_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.terminateActivity(input); + } finally { + span.finish(); + } + } + + @Override + public ListActivitiesOutput listActivities(ListActivitiesInput input) { + Span span = + spanFactory + .createStandaloneActivityQuerySpan( + tracer, SpanOperationType.LIST_STANDALONE_ACTIVITIES, input.getQuery()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.listActivities(input); + } finally { + span.finish(); + } + } + + @Override + public CountActivitiesOutput countActivities(CountActivitiesInput input) { + Span span = + spanFactory + .createStandaloneActivityQuerySpan( + tracer, SpanOperationType.COUNT_STANDALONE_ACTIVITIES, input.getQuery()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.countActivities(input); + } finally { + span.finish(); + } + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java index 2091df7ec4..81b97ea689 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java @@ -46,14 +46,22 @@ public ActivityOutput execute(ActivityInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); ActivityInfo activityInfo = activityExecutionContext.getInfo(); Span activityRunSpan = - spanFactory - .createActivityRunSpan( - tracer, - activityInfo.getActivityType(), - activityInfo.getWorkflowId(), - activityInfo.getWorkflowRunId(), - rootSpanContext) - .start(); + activityInfo.isInWorkflow() + ? spanFactory + .createActivityRunSpan( + tracer, + activityInfo.getActivityType(), + activityInfo.getWorkflowId(), + activityInfo.getWorkflowRunId(), + rootSpanContext) + .start() + : spanFactory + .createStandaloneActivityRunSpan( + tracer, + activityInfo.getActivityType(), + activityInfo.getActivityId(), + rootSpanContext) + .start(); try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) { return super.execute(input); } catch (Throwable t) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 945d777a6c..d7f498014c 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -186,6 +186,52 @@ public Tracer.SpanBuilder createCancelNexusOperationSpan( return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createStandaloneActivityStartSpan( + Tracer tracer, String activityType, String activityId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.START_STANDALONE_ACTIVITY) + .setActionName(activityType) + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityRunSpan( + Tracer tracer, + String activityType, + String activityId, + SpanContext activityStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.RUN_STANDALONE_ACTIVITY) + .setActionName(activityType) + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityOperationSpan( + Tracer tracer, SpanOperationType operationType, String activityId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(operationType) + .setActionName("StandaloneActivity") + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityQuerySpan( + Tracer tracer, SpanOperationType operationType, String query) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(operationType) + .setActionName(query) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java new file mode 100644 index 0000000000..0e22ee475f --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java @@ -0,0 +1,296 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.*; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.api.workflowservice.v1.CountActivityExecutionsResponse; +import io.temporal.client.ActivityExecutionCount; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link OpenTracingActivityClientCallsInterceptor}. Verifies that each intercepted + * method creates a span with the expected operation name and tags. Uses a stub next-interceptor so + * no server is required. + */ +public class StandaloneActivityClientTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private OpenTracingActivityClientCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityClientCallsInterceptor( + new StubActivityClientCallsInterceptor(), + otOptions, + new SpanFactory(otOptions), + new ContextAccessor(otOptions)); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStartActivityCreatesSpanWithHeaderPropagation() { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-123") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + Header header = Header.empty(); + ActivityClientCallsInterceptor.StartActivityInput input = + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, header); + + interceptor.startActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("StartStandaloneActivity:MyActivity", span.operationName()); + assertEquals("act-123", span.tags().get("activityId")); + assertFalse("Trace context should be propagated into header", header.getValues().isEmpty()); + } + + @Test + public void testGetActivityResultCreatesSpan() throws TimeoutException { + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-456", null, String.class); + + interceptor.getActivityResult(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-456", span.tags().get("activityId")); + } + + @Test + public void testGetActivityResultAsyncCreatesSpan() throws Exception { + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-789", null, String.class); + + CompletableFuture> future = + interceptor.getActivityResultAsync(input); + future.get(); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-789", span.tags().get("activityId")); + } + + @Test + public void testGetActivityResultAsyncFinishesSpanWhenNextThrowsSynchronously() { + OpenTracingActivityClientCallsInterceptor throwingInterceptor = + new OpenTracingActivityClientCallsInterceptor( + new SynchronouslyThrowingActivityClientCallsInterceptor(), + otOptions, + new SpanFactory(otOptions), + new ContextAccessor(otOptions)); + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-throws", null, String.class); + + try { + throwingInterceptor.getActivityResultAsync(input); + fail("Expected getActivityResultAsync to throw"); + } catch (IllegalStateException expected) { + assertEquals("sync failure", expected.getMessage()); + } + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-throws", span.tags().get("activityId")); + } + + @Test + public void testDescribeActivityCreatesSpan() { + ActivityClientCallsInterceptor.DescribeActivityInput input = + new ActivityClientCallsInterceptor.DescribeActivityInput("act-desc", null); + + interceptor.describeActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("DescribeStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-desc", span.tags().get("activityId")); + } + + @Test + public void testCancelActivityCreatesSpan() { + ActivityClientCallsInterceptor.CancelActivityInput input = + new ActivityClientCallsInterceptor.CancelActivityInput("act-cancel", null, "reason"); + + interceptor.cancelActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("CancelStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-cancel", span.tags().get("activityId")); + } + + @Test + public void testTerminateActivityCreatesSpan() { + ActivityClientCallsInterceptor.TerminateActivityInput input = + new ActivityClientCallsInterceptor.TerminateActivityInput("act-term", null, "reason"); + + interceptor.terminateActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("TerminateStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-term", span.tags().get("activityId")); + } + + @Test + public void testListActivitiesCreatesSpan() { + ActivityClientCallsInterceptor.ListActivitiesInput input = + new ActivityClientCallsInterceptor.ListActivitiesInput("TaskQueue = 'tq'"); + + interceptor.listActivities(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("ListStandaloneActivities:TaskQueue = 'tq'", span.operationName()); + assertNull(span.tags().get("activityId")); + } + + @Test + public void testCountActivitiesCreatesSpan() { + ActivityClientCallsInterceptor.CountActivitiesInput input = + new ActivityClientCallsInterceptor.CountActivitiesInput("TaskQueue = 'tq'"); + + interceptor.countActivities(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("CountStandaloneActivities:TaskQueue = 'tq'", span.operationName()); + assertNull(span.tags().get("activityId")); + } + + @Test + public void testStartActivitySpanIsChildOfActiveSpan() { + MockSpan parentSpan = mockTracer.buildSpan("ClientFunction").start(); + try (io.opentracing.Scope ignored = mockTracer.scopeManager().activate(parentSpan)) { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-child") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + interceptor.startActivity( + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, Header.empty())); + } finally { + parentSpan.finish(); + } + + List spans = mockTracer.finishedSpans(); + assertEquals(2, spans.size()); + + MockSpan activitySpan = spans.get(0); + assertEquals("StartStandaloneActivity:MyActivity", activitySpan.operationName()); + assertEquals(parentSpan.context().spanId(), activitySpan.parentId()); + } + + private static class StubActivityClientCallsInterceptor + extends ActivityClientCallsInterceptorBase { + + StubActivityClientCallsInterceptor() { + super(null); + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + return new StartActivityOutput(input.getOptions().getId(), null); + } + + @Override + public GetActivityResultOutput getActivityResult(GetActivityResultInput input) + throws TimeoutException { + return new GetActivityResultOutput<>(null); + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + return CompletableFuture.completedFuture(new GetActivityResultOutput<>(null)); + } + + @Override + public DescribeActivityOutput describeActivity(DescribeActivityInput input) { + return new DescribeActivityOutput(null); + } + + @Override + public CancelActivityOutput cancelActivity(CancelActivityInput input) { + return new CancelActivityOutput(); + } + + @Override + public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { + return new TerminateActivityOutput(); + } + + @Override + public ListActivitiesOutput listActivities(ListActivitiesInput input) { + return new ListActivitiesOutput(Stream.empty()); + } + + @Override + public CountActivitiesOutput countActivities(CountActivitiesInput input) { + return new CountActivitiesOutput( + new ActivityExecutionCount(CountActivityExecutionsResponse.getDefaultInstance())); + } + } + + private static class SynchronouslyThrowingActivityClientCallsInterceptor + extends ActivityClientCallsInterceptorBase { + + SynchronouslyThrowingActivityClientCallsInterceptor() { + super(null); + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + throw new IllegalStateException("sync failure"); + } + } +} diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java new file mode 100644 index 0000000000..90a599a83b --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java @@ -0,0 +1,92 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentracing.Span; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.activity.ActivityInfo; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Unit tests for standalone activity tracing on the worker side. */ +public class StandaloneActivityWorkerTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private final SpanFactory spanFactory = new SpanFactory(otOptions); + private final ContextAccessor contextAccessor = new ContextAccessor(otOptions); + + private OpenTracingActivityInboundCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityInboundCallsInterceptor( + new StubActivityInboundCallsInterceptor(), otOptions, spanFactory, contextAccessor); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStandaloneActivityRunCreatesSpanWithActivityId() { + Header header = Header.empty(); + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createStandaloneActivityStartSpan( + mockTracer, "MyStandaloneActivity", "act-run") + .start(), + header, + mockTracer); + activityStartSpan.finish(); + + ActivityExecutionContext executionContext = mock(ActivityExecutionContext.class); + ActivityInfo activityInfo = mock(ActivityInfo.class); + when(activityInfo.isInWorkflow()).thenReturn(false); + when(activityInfo.getActivityType()).thenReturn("MyStandaloneActivity"); + when(activityInfo.getActivityId()).thenReturn("act-run"); + when(executionContext.getInfo()).thenReturn(activityInfo); + + interceptor.init(executionContext); + interceptor.execute(new ActivityInboundCallsInterceptor.ActivityInput(header, new Object[0])); + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + MockSpan startSpan = + spansHelper.getSpanByOperationName("StartStandaloneActivity:MyStandaloneActivity"); + MockSpan runSpan = + spansHelper.getSpanByOperationName("RunStandaloneActivity:MyStandaloneActivity"); + assertEquals("act-run", runSpan.tags().get("activityId")); + assertEquals(startSpan.context().spanId(), runSpan.parentId()); + } + + private static class StubActivityInboundCallsInterceptor + implements ActivityInboundCallsInterceptor { + @Override + public void init(ActivityExecutionContext context) {} + + @Override + public ActivityOutput execute(ActivityInput input) { + return new ActivityOutput(null); + } + } +} From d2020a52e7dce03925cfcf7186715fb21b8d387e Mon Sep 17 00:00:00 2001 From: 444am Date: Sat, 13 Jun 2026 12:00:26 +1000 Subject: [PATCH 2/3] Align standalone activity tracing spans --- contrib/temporal-opentracing/README.md | 13 +- .../opentracing/SpanOperationType.java | 8 - .../ActionTypeAndNameSpanBuilderProvider.java | 24 +-- ...TracingActivityClientCallsInterceptor.java | 112 +----------- ...racingActivityInboundCallsInterceptor.java | 25 +-- .../opentracing/internal/SpanFactory.java | 62 ++----- .../StandaloneActivityClientTracingTest.java | 172 +++--------------- .../StandaloneActivityWorkerTracingTest.java | 12 +- 8 files changed, 81 insertions(+), 347 deletions(-) diff --git a/contrib/temporal-opentracing/README.md b/contrib/temporal-opentracing/README.md index 8cedf58f91..c9483783ec 100644 --- a/contrib/temporal-opentracing/README.md +++ b/contrib/temporal-opentracing/README.md @@ -6,7 +6,7 @@ This module provides a set of Interceptors that adds support for OpenTracing Spa You want to register two interceptors - one on the Temporal client side, another on the worker side: -1. Client configuration: +1. Workflow Client configuration: ```java WorkflowClientOptions.newBuilder() //... @@ -21,6 +21,16 @@ You want to register two interceptors - one on the Temporal client side, another .build(); ``` +3. Standalone Activity Client configuration: + ```java + ActivityClientOptions activityClientOptions = + ActivityClientOptions.newBuilder() + //... + .setInterceptors(Collections.singletonList(new OpenTracingActivityClientInterceptor())) + .build(); + ActivityClient activityClient = ActivityClient.newInstance(service, activityClientOptions); + ``` + ## [OpenTelemetry](https://opentelemetry.io/) OpenTracing has been merged into OpenTelemetry and nowadays OpenTelemetry should be a preferred solution. @@ -38,4 +48,3 @@ to hook their OpenTelemetry setup and make it available for OpenTracing API: GlobalTracer.registerIfAbsent(tracer); ``` - diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 36e8e291bf..2f8a27429d 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -16,14 +16,6 @@ public enum SpanOperationType { HANDLE_SIGNAL("HandleSignal"), HANDLE_UPDATE("HandleUpdate"), START_NEXUS_OPERATION("StartNexusOperation"), - START_STANDALONE_ACTIVITY("StartStandaloneActivity"), - RUN_STANDALONE_ACTIVITY("RunStandaloneActivity"), - GET_STANDALONE_ACTIVITY_RESULT("GetStandaloneActivityResult"), - DESCRIBE_STANDALONE_ACTIVITY("DescribeStandaloneActivity"), - CANCEL_STANDALONE_ACTIVITY("CancelStandaloneActivity"), - TERMINATE_STANDALONE_ACTIVITY("TerminateStandaloneActivity"), - LIST_STANDALONE_ACTIVITIES("ListStandaloneActivities"), - COUNT_STANDALONE_ACTIVITIES("CountStandaloneActivities"), RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"), RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"); diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index d0978e457b..3b56f575da 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -66,8 +66,6 @@ protected Map getSpanTags(SpanCreationContext context) { StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.PARENT_RUN_ID, context.getParentRunId()); case RUN_WORKFLOW: - case START_ACTIVITY: - case RUN_ACTIVITY: case SIGNAL_EXTERNAL_WORKFLOW: case SIGNAL_WORKFLOW: case UPDATE_WORKFLOW: @@ -80,19 +78,23 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case START_ACTIVITY: + case RUN_ACTIVITY: + ImmutableMap.Builder tags = ImmutableMap.builder(); + if (context.getActivityId() != null) { + tags.put(StandardTagNames.ACTIVITY_ID, context.getActivityId()); + } + if (context.getWorkflowId() != null) { + tags.put(StandardTagNames.WORKFLOW_ID, context.getWorkflowId()); + } + if (context.getRunId() != null) { + tags.put(StandardTagNames.RUN_ID, context.getRunId()); + } + return tags.build(); case START_NEXUS_OPERATION: return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); - case START_STANDALONE_ACTIVITY: - case RUN_STANDALONE_ACTIVITY: - case GET_STANDALONE_ACTIVITY_RESULT: - case DESCRIBE_STANDALONE_ACTIVITY: - case CANCEL_STANDALONE_ACTIVITY: - case TERMINATE_STANDALONE_ACTIVITY: - return ImmutableMap.of(StandardTagNames.ACTIVITY_ID, context.getActivityId()); - case LIST_STANDALONE_ACTIVITIES: - case COUNT_STANDALONE_ACTIVITIES: case RUN_START_NEXUS_OPERATION: case RUN_CANCEL_NEXUS_OPERATION: case HANDLE_QUERY: diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java index b66e37faa5..774bd70a24 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java @@ -6,9 +6,6 @@ import io.temporal.common.interceptors.ActivityClientCallsInterceptor; import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; import io.temporal.opentracing.OpenTracingOptions; -import io.temporal.opentracing.SpanOperationType; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; public class OpenTracingActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase { private final SpanFactory spanFactory; @@ -32,8 +29,8 @@ public StartActivityOutput startActivity(StartActivityInput input) { contextAccessor.writeSpanContextToHeader( () -> spanFactory - .createStandaloneActivityStartSpan( - tracer, input.getActivityType(), input.getOptions().getId()) + .createActivityStartSpan( + tracer, input.getActivityType(), null, null, input.getOptions().getId()) .start(), input.getHeader(), tracer); @@ -43,109 +40,4 @@ public StartActivityOutput startActivity(StartActivityInput input) { activityStartSpan.finish(); } } - - @Override - public GetActivityResultOutput getActivityResult(GetActivityResultInput input) - throws TimeoutException { - Span span = - spanFactory - .createStandaloneActivityOperationSpan( - tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.getActivityResult(input); - } finally { - span.finish(); - } - } - - @Override - public CompletableFuture> getActivityResultAsync( - GetActivityResultInput input) { - Span span = - spanFactory - .createStandaloneActivityOperationSpan( - tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.getActivityResultAsync(input) - .whenComplete( - (result, throwable) -> { - span.finish(); - }); - } catch (Throwable t) { - span.finish(); - throw t; - } - } - - @Override - public DescribeActivityOutput describeActivity(DescribeActivityInput input) { - Span span = - spanFactory - .createStandaloneActivityOperationSpan( - tracer, SpanOperationType.DESCRIBE_STANDALONE_ACTIVITY, input.getId()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.describeActivity(input); - } finally { - span.finish(); - } - } - - @Override - public CancelActivityOutput cancelActivity(CancelActivityInput input) { - Span span = - spanFactory - .createStandaloneActivityOperationSpan( - tracer, SpanOperationType.CANCEL_STANDALONE_ACTIVITY, input.getId()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.cancelActivity(input); - } finally { - span.finish(); - } - } - - @Override - public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { - Span span = - spanFactory - .createStandaloneActivityOperationSpan( - tracer, SpanOperationType.TERMINATE_STANDALONE_ACTIVITY, input.getId()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.terminateActivity(input); - } finally { - span.finish(); - } - } - - @Override - public ListActivitiesOutput listActivities(ListActivitiesInput input) { - Span span = - spanFactory - .createStandaloneActivityQuerySpan( - tracer, SpanOperationType.LIST_STANDALONE_ACTIVITIES, input.getQuery()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.listActivities(input); - } finally { - span.finish(); - } - } - - @Override - public CountActivitiesOutput countActivities(CountActivitiesInput input) { - Span span = - spanFactory - .createStandaloneActivityQuerySpan( - tracer, SpanOperationType.COUNT_STANDALONE_ACTIVITIES, input.getQuery()) - .start(); - try (Scope ignored = tracer.scopeManager().activate(span)) { - return super.countActivities(input); - } finally { - span.finish(); - } - } } diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java index 81b97ea689..d0b2ce2ede 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java @@ -46,22 +46,15 @@ public ActivityOutput execute(ActivityInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); ActivityInfo activityInfo = activityExecutionContext.getInfo(); Span activityRunSpan = - activityInfo.isInWorkflow() - ? spanFactory - .createActivityRunSpan( - tracer, - activityInfo.getActivityType(), - activityInfo.getWorkflowId(), - activityInfo.getWorkflowRunId(), - rootSpanContext) - .start() - : spanFactory - .createStandaloneActivityRunSpan( - tracer, - activityInfo.getActivityType(), - activityInfo.getActivityId(), - rootSpanContext) - .start(); + spanFactory + .createActivityRunSpan( + tracer, + activityInfo.getActivityType(), + activityInfo.getWorkflowId(), + activityInfo.getWorkflowRunId(), + activityInfo.getActivityId(), + rootSpanContext) + .start(); try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) { return super.execute(input); } catch (Throwable t) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index d7f498014c..f16a68ca5a 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -140,12 +140,22 @@ public Tracer.SpanBuilder createWorkflowRunSpan( public Tracer.SpanBuilder createActivityStartSpan( Tracer tracer, String activityType, String workflowId, String runId) { + return createActivityStartSpan(tracer, activityType, workflowId, runId, null); + } + + public Tracer.SpanBuilder createActivityStartSpan( + Tracer tracer, + String activityType, + @Nullable String workflowId, + @Nullable String runId, + @Nullable String activityId) { SpanCreationContext context = SpanCreationContext.newBuilder() .setSpanOperationType(SpanOperationType.START_ACTIVITY) .setActionName(activityType) .setWorkflowId(workflowId) .setRunId(runId) + .setActivityId(activityId) .build(); return createSpan(context, tracer, null, References.CHILD_OF); } @@ -153,8 +163,9 @@ public Tracer.SpanBuilder createActivityStartSpan( public Tracer.SpanBuilder createActivityRunSpan( Tracer tracer, String activityType, - String workflowId, - String runId, + @Nullable String workflowId, + @Nullable String runId, + @Nullable String activityId, SpanContext activityStartSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() @@ -162,6 +173,7 @@ public Tracer.SpanBuilder createActivityRunSpan( .setActionName(activityType) .setWorkflowId(workflowId) .setRunId(runId) + .setActivityId(activityId) .build(); return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } @@ -186,52 +198,6 @@ public Tracer.SpanBuilder createCancelNexusOperationSpan( return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); } - public Tracer.SpanBuilder createStandaloneActivityStartSpan( - Tracer tracer, String activityType, String activityId) { - SpanCreationContext context = - SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.START_STANDALONE_ACTIVITY) - .setActionName(activityType) - .setActivityId(activityId) - .build(); - return createSpan(context, tracer, null, References.FOLLOWS_FROM); - } - - public Tracer.SpanBuilder createStandaloneActivityRunSpan( - Tracer tracer, - String activityType, - String activityId, - SpanContext activityStartSpanContext) { - SpanCreationContext context = - SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.RUN_STANDALONE_ACTIVITY) - .setActionName(activityType) - .setActivityId(activityId) - .build(); - return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); - } - - public Tracer.SpanBuilder createStandaloneActivityOperationSpan( - Tracer tracer, SpanOperationType operationType, String activityId) { - SpanCreationContext context = - SpanCreationContext.newBuilder() - .setSpanOperationType(operationType) - .setActionName("StandaloneActivity") - .setActivityId(activityId) - .build(); - return createSpan(context, tracer, null, References.FOLLOWS_FROM); - } - - public Tracer.SpanBuilder createStandaloneActivityQuerySpan( - Tracer tracer, SpanOperationType operationType, String query) { - SpanCreationContext context = - SpanCreationContext.newBuilder() - .setSpanOperationType(operationType) - .setActionName(query) - .build(); - return createSpan(context, tracer, null, References.FOLLOWS_FROM); - } - public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java index 0e22ee475f..c852175196 100644 --- a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java @@ -24,11 +24,7 @@ import org.junit.Before; import org.junit.Test; -/** - * Unit tests for {@link OpenTracingActivityClientCallsInterceptor}. Verifies that each intercepted - * method creates a span with the expected operation name and tags. Uses a stub next-interceptor so - * no server is required. - */ +/** Unit tests for standalone activity tracing on the client side. */ public class StandaloneActivityClientTracingTest { private final MockTracer mockTracer = @@ -73,137 +69,11 @@ public void testStartActivityCreatesSpanWithHeaderPropagation() { List spans = mockTracer.finishedSpans(); assertEquals(1, spans.size()); MockSpan span = spans.get(0); - assertEquals("StartStandaloneActivity:MyActivity", span.operationName()); + assertEquals("StartActivity:MyActivity", span.operationName()); assertEquals("act-123", span.tags().get("activityId")); assertFalse("Trace context should be propagated into header", header.getValues().isEmpty()); } - @Test - public void testGetActivityResultCreatesSpan() throws TimeoutException { - ActivityClientCallsInterceptor.GetActivityResultInput input = - new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-456", null, String.class); - - interceptor.getActivityResult(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); - assertEquals("act-456", span.tags().get("activityId")); - } - - @Test - public void testGetActivityResultAsyncCreatesSpan() throws Exception { - ActivityClientCallsInterceptor.GetActivityResultInput input = - new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-789", null, String.class); - - CompletableFuture> future = - interceptor.getActivityResultAsync(input); - future.get(); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); - assertEquals("act-789", span.tags().get("activityId")); - } - - @Test - public void testGetActivityResultAsyncFinishesSpanWhenNextThrowsSynchronously() { - OpenTracingActivityClientCallsInterceptor throwingInterceptor = - new OpenTracingActivityClientCallsInterceptor( - new SynchronouslyThrowingActivityClientCallsInterceptor(), - otOptions, - new SpanFactory(otOptions), - new ContextAccessor(otOptions)); - ActivityClientCallsInterceptor.GetActivityResultInput input = - new ActivityClientCallsInterceptor.GetActivityResultInput<>( - "act-throws", null, String.class); - - try { - throwingInterceptor.getActivityResultAsync(input); - fail("Expected getActivityResultAsync to throw"); - } catch (IllegalStateException expected) { - assertEquals("sync failure", expected.getMessage()); - } - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); - assertEquals("act-throws", span.tags().get("activityId")); - } - - @Test - public void testDescribeActivityCreatesSpan() { - ActivityClientCallsInterceptor.DescribeActivityInput input = - new ActivityClientCallsInterceptor.DescribeActivityInput("act-desc", null); - - interceptor.describeActivity(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("DescribeStandaloneActivity:StandaloneActivity", span.operationName()); - assertEquals("act-desc", span.tags().get("activityId")); - } - - @Test - public void testCancelActivityCreatesSpan() { - ActivityClientCallsInterceptor.CancelActivityInput input = - new ActivityClientCallsInterceptor.CancelActivityInput("act-cancel", null, "reason"); - - interceptor.cancelActivity(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("CancelStandaloneActivity:StandaloneActivity", span.operationName()); - assertEquals("act-cancel", span.tags().get("activityId")); - } - - @Test - public void testTerminateActivityCreatesSpan() { - ActivityClientCallsInterceptor.TerminateActivityInput input = - new ActivityClientCallsInterceptor.TerminateActivityInput("act-term", null, "reason"); - - interceptor.terminateActivity(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("TerminateStandaloneActivity:StandaloneActivity", span.operationName()); - assertEquals("act-term", span.tags().get("activityId")); - } - - @Test - public void testListActivitiesCreatesSpan() { - ActivityClientCallsInterceptor.ListActivitiesInput input = - new ActivityClientCallsInterceptor.ListActivitiesInput("TaskQueue = 'tq'"); - - interceptor.listActivities(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("ListStandaloneActivities:TaskQueue = 'tq'", span.operationName()); - assertNull(span.tags().get("activityId")); - } - - @Test - public void testCountActivitiesCreatesSpan() { - ActivityClientCallsInterceptor.CountActivitiesInput input = - new ActivityClientCallsInterceptor.CountActivitiesInput("TaskQueue = 'tq'"); - - interceptor.countActivities(input); - - List spans = mockTracer.finishedSpans(); - assertEquals(1, spans.size()); - MockSpan span = spans.get(0); - assertEquals("CountStandaloneActivities:TaskQueue = 'tq'", span.operationName()); - assertNull(span.tags().get("activityId")); - } - @Test public void testStartActivitySpanIsChildOfActiveSpan() { MockSpan parentSpan = mockTracer.buildSpan("ClientFunction").start(); @@ -225,10 +95,32 @@ public void testStartActivitySpanIsChildOfActiveSpan() { assertEquals(2, spans.size()); MockSpan activitySpan = spans.get(0); - assertEquals("StartStandaloneActivity:MyActivity", activitySpan.operationName()); + assertEquals("StartActivity:MyActivity", activitySpan.operationName()); assertEquals(parentSpan.context().spanId(), activitySpan.parentId()); } + @Test + public void testManagementCallsDoNotCreateSpans() throws TimeoutException { + interceptor.getActivityResult( + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-result", null, String.class)); + interceptor.getActivityResultAsync( + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-result-async", null, String.class)); + interceptor.describeActivity( + new ActivityClientCallsInterceptor.DescribeActivityInput("act-desc", null)); + interceptor.cancelActivity( + new ActivityClientCallsInterceptor.CancelActivityInput("act-cancel", null, "reason")); + interceptor.terminateActivity( + new ActivityClientCallsInterceptor.TerminateActivityInput("act-term", null, "reason")); + interceptor.listActivities( + new ActivityClientCallsInterceptor.ListActivitiesInput("TaskQueue = 'tq'")); + interceptor.countActivities( + new ActivityClientCallsInterceptor.CountActivitiesInput("TaskQueue = 'tq'")); + + assertTrue(mockTracer.finishedSpans().isEmpty()); + } + private static class StubActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase { @@ -279,18 +171,4 @@ public CountActivitiesOutput countActivities(CountActivitiesInput input) { new ActivityExecutionCount(CountActivityExecutionsResponse.getDefaultInstance())); } } - - private static class SynchronouslyThrowingActivityClientCallsInterceptor - extends ActivityClientCallsInterceptorBase { - - SynchronouslyThrowingActivityClientCallsInterceptor() { - super(null); - } - - @Override - public CompletableFuture> getActivityResultAsync( - GetActivityResultInput input) { - throw new IllegalStateException("sync failure"); - } - } } diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java index 90a599a83b..85c330c3b1 100644 --- a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java @@ -1,6 +1,7 @@ package io.temporal.opentracing; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,8 +54,8 @@ public void testStandaloneActivityRunCreatesSpanWithActivityId() { contextAccessor.writeSpanContextToHeader( () -> spanFactory - .createStandaloneActivityStartSpan( - mockTracer, "MyStandaloneActivity", "act-run") + .createActivityStartSpan( + mockTracer, "MyStandaloneActivity", null, null, "act-run") .start(), header, mockTracer); @@ -72,10 +73,11 @@ public void testStandaloneActivityRunCreatesSpanWithActivityId() { OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); MockSpan startSpan = - spansHelper.getSpanByOperationName("StartStandaloneActivity:MyStandaloneActivity"); - MockSpan runSpan = - spansHelper.getSpanByOperationName("RunStandaloneActivity:MyStandaloneActivity"); + spansHelper.getSpanByOperationName("StartActivity:MyStandaloneActivity"); + MockSpan runSpan = spansHelper.getSpanByOperationName("RunActivity:MyStandaloneActivity"); assertEquals("act-run", runSpan.tags().get("activityId")); + assertNull(runSpan.tags().get("workflowId")); + assertNull(runSpan.tags().get("runId")); assertEquals(startSpan.context().spanId(), runSpan.parentId()); } From 445baac9958d25c3908fda50b89c1017c79de85b Mon Sep 17 00:00:00 2001 From: 444am Date: Sat, 13 Jun 2026 17:25:04 +1000 Subject: [PATCH 3/3] Remove activity start span overload --- .../OpenTracingWorkflowOutboundCallsInterceptor.java | 2 +- .../java/io/temporal/opentracing/internal/SpanFactory.java | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 958aeb2d1c..41c85ca0c7 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -251,7 +251,7 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) { private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) { WorkflowInfo workflowInfo = Workflow.getInfo(); return spanFactory.createActivityStartSpan( - tracer, activityName, workflowInfo.getWorkflowId(), workflowInfo.getRunId()); + tracer, activityName, workflowInfo.getWorkflowId(), workflowInfo.getRunId(), null); } private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflowInput input) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index f16a68ca5a..fdfe09d1a2 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -138,11 +138,6 @@ public Tracer.SpanBuilder createWorkflowRunSpan( return createSpan(context, tracer, workflowStartSpanContext, References.FOLLOWS_FROM); } - public Tracer.SpanBuilder createActivityStartSpan( - Tracer tracer, String activityType, String workflowId, String runId) { - return createActivityStartSpan(tracer, activityType, workflowId, runId, null); - } - public Tracer.SpanBuilder createActivityStartSpan( Tracer tracer, String activityType,