diff --git a/contrib/temporal-opentracing/README.md b/contrib/temporal-opentracing/README.md index 8cedf58f91..c9483783ec 100644 --- a/contrib/temporal-opentracing/README.md +++ b/contrib/temporal-opentracing/README.md @@ -6,7 +6,7 @@ This module provides a set of Interceptors that adds support for OpenTracing Spa You want to register two interceptors - one on the Temporal client side, another on the worker side: -1. Client configuration: +1. Workflow Client configuration: ```java WorkflowClientOptions.newBuilder() //... @@ -21,6 +21,16 @@ You want to register two interceptors - one on the Temporal client side, another .build(); ``` +3. Standalone Activity Client configuration: + ```java + ActivityClientOptions activityClientOptions = + ActivityClientOptions.newBuilder() + //... + .setInterceptors(Collections.singletonList(new OpenTracingActivityClientInterceptor())) + .build(); + ActivityClient activityClient = ActivityClient.newInstance(service, activityClientOptions); + ``` + ## [OpenTelemetry](https://opentelemetry.io/) OpenTracing has been merged into OpenTelemetry and nowadays OpenTelemetry should be a preferred solution. @@ -38,4 +48,3 @@ to hook their OpenTelemetry setup and make it available for OpenTracing API: GlobalTracer.registerIfAbsent(tracer); ``` - diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java new file mode 100644 index 0000000000..0addec6d5e --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java @@ -0,0 +1,30 @@ +package io.temporal.opentracing; + +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientInterceptorBase; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; + +public class OpenTracingActivityClientInterceptor extends ActivityClientInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientInterceptor() { + this(OpenTracingOptions.getDefaultInstance()); + } + + public OpenTracingActivityClientInterceptor(OpenTracingOptions options) { + this.options = options; + this.spanFactory = new SpanFactory(options); + this.contextAccessor = new ContextAccessor(options); + } + + @Override + public ActivityClientCallsInterceptor activityClientCallsInterceptor( + ActivityClientCallsInterceptor next) { + return new OpenTracingActivityClientCallsInterceptor( + next, options, spanFactory, contextAccessor); + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java index a2756afdde..c419a04976 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java @@ -14,6 +14,7 @@ public class SpanCreationContext { private final String runId; private final String parentWorkflowId; private final String parentRunId; + private final String activityId; private SpanCreationContext( SpanOperationType spanOperationType, @@ -21,13 +22,15 @@ private SpanCreationContext( String workflowId, String runId, String parentWorkflowId, - String parentRunId) { + String parentRunId, + String activityId) { this.spanOperationType = spanOperationType; this.actionName = actionName; this.workflowId = workflowId; this.runId = runId; this.parentWorkflowId = parentWorkflowId; this.parentRunId = parentRunId; + this.activityId = activityId; } public SpanOperationType getSpanOperationType() { @@ -59,6 +62,10 @@ public String getParentRunId() { return parentRunId; } + public @Nullable String getActivityId() { + return activityId; + } + public static Builder newBuilder() { return new Builder(); } @@ -70,6 +77,7 @@ public static final class Builder { private String runId; private String parentWorkflowId; private String parentRunId; + private String activityId; private Builder() {} @@ -103,9 +111,20 @@ public Builder setParentRunId(String parentRunId) { return this; } + public Builder setActivityId(String activityId) { + this.activityId = activityId; + return this; + } + public SpanCreationContext build() { return new SpanCreationContext( - spanOperationType, actionName, workflowId, runId, parentWorkflowId, parentRunId); + spanOperationType, + actionName, + workflowId, + runId, + parentWorkflowId, + parentRunId, + activityId); } } } diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java index de9cf3f4ec..d1fb48e479 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java @@ -3,6 +3,7 @@ public class StandardTagNames { public static final String WORKFLOW_ID = "workflowId"; public static final String RUN_ID = "runId"; + public static final String ACTIVITY_ID = "activityId"; public static final String PARENT_WORKFLOW_ID = "parentWorkflowId"; public static final String PARENT_RUN_ID = "parentRunId"; diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 1734f3a36d..3b56f575da 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -66,8 +66,6 @@ protected Map getSpanTags(SpanCreationContext context) { StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.PARENT_RUN_ID, context.getParentRunId()); case RUN_WORKFLOW: - case START_ACTIVITY: - case RUN_ACTIVITY: case SIGNAL_EXTERNAL_WORKFLOW: case SIGNAL_WORKFLOW: case UPDATE_WORKFLOW: @@ -80,6 +78,19 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case START_ACTIVITY: + case RUN_ACTIVITY: + ImmutableMap.Builder tags = ImmutableMap.builder(); + if (context.getActivityId() != null) { + tags.put(StandardTagNames.ACTIVITY_ID, context.getActivityId()); + } + if (context.getWorkflowId() != null) { + tags.put(StandardTagNames.WORKFLOW_ID, context.getWorkflowId()); + } + if (context.getRunId() != null) { + tags.put(StandardTagNames.RUN_ID, context.getRunId()); + } + return tags.build(); case START_NEXUS_OPERATION: return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java new file mode 100644 index 0000000000..774bd70a24 --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java @@ -0,0 +1,43 @@ +package io.temporal.opentracing.internal; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; + +public class OpenTracingActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase { + private final SpanFactory spanFactory; + private final Tracer tracer; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientCallsInterceptor( + ActivityClientCallsInterceptor next, + OpenTracingOptions options, + SpanFactory spanFactory, + ContextAccessor contextAccessor) { + super(next); + this.spanFactory = spanFactory; + this.tracer = options.getTracer(); + this.contextAccessor = contextAccessor; + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createActivityStartSpan( + tracer, input.getActivityType(), null, null, input.getOptions().getId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) { + return super.startActivity(input); + } finally { + activityStartSpan.finish(); + } + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java index 2091df7ec4..d0b2ce2ede 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java @@ -52,6 +52,7 @@ public ActivityOutput execute(ActivityInput input) { activityInfo.getActivityType(), activityInfo.getWorkflowId(), activityInfo.getWorkflowRunId(), + activityInfo.getActivityId(), rootSpanContext) .start(); try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 958aeb2d1c..41c85ca0c7 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -251,7 +251,7 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) { private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) { WorkflowInfo workflowInfo = Workflow.getInfo(); return spanFactory.createActivityStartSpan( - tracer, activityName, workflowInfo.getWorkflowId(), workflowInfo.getRunId()); + tracer, activityName, workflowInfo.getWorkflowId(), workflowInfo.getRunId(), null); } private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflowInput input) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 945d777a6c..fdfe09d1a2 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -139,13 +139,18 @@ public Tracer.SpanBuilder createWorkflowRunSpan( } public Tracer.SpanBuilder createActivityStartSpan( - Tracer tracer, String activityType, String workflowId, String runId) { + Tracer tracer, + String activityType, + @Nullable String workflowId, + @Nullable String runId, + @Nullable String activityId) { SpanCreationContext context = SpanCreationContext.newBuilder() .setSpanOperationType(SpanOperationType.START_ACTIVITY) .setActionName(activityType) .setWorkflowId(workflowId) .setRunId(runId) + .setActivityId(activityId) .build(); return createSpan(context, tracer, null, References.CHILD_OF); } @@ -153,8 +158,9 @@ public Tracer.SpanBuilder createActivityStartSpan( public Tracer.SpanBuilder createActivityRunSpan( Tracer tracer, String activityType, - String workflowId, - String runId, + @Nullable String workflowId, + @Nullable String runId, + @Nullable String activityId, SpanContext activityStartSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() @@ -162,6 +168,7 @@ public Tracer.SpanBuilder createActivityRunSpan( .setActionName(activityType) .setWorkflowId(workflowId) .setRunId(runId) + .setActivityId(activityId) .build(); return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java new file mode 100644 index 0000000000..c852175196 --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java @@ -0,0 +1,174 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.*; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.api.workflowservice.v1.CountActivityExecutionsResponse; +import io.temporal.client.ActivityExecutionCount; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Unit tests for standalone activity tracing on the client side. */ +public class StandaloneActivityClientTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private OpenTracingActivityClientCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityClientCallsInterceptor( + new StubActivityClientCallsInterceptor(), + otOptions, + new SpanFactory(otOptions), + new ContextAccessor(otOptions)); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStartActivityCreatesSpanWithHeaderPropagation() { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-123") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + Header header = Header.empty(); + ActivityClientCallsInterceptor.StartActivityInput input = + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, header); + + interceptor.startActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("StartActivity:MyActivity", span.operationName()); + assertEquals("act-123", span.tags().get("activityId")); + assertFalse("Trace context should be propagated into header", header.getValues().isEmpty()); + } + + @Test + public void testStartActivitySpanIsChildOfActiveSpan() { + MockSpan parentSpan = mockTracer.buildSpan("ClientFunction").start(); + try (io.opentracing.Scope ignored = mockTracer.scopeManager().activate(parentSpan)) { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-child") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + interceptor.startActivity( + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, Header.empty())); + } finally { + parentSpan.finish(); + } + + List spans = mockTracer.finishedSpans(); + assertEquals(2, spans.size()); + + MockSpan activitySpan = spans.get(0); + assertEquals("StartActivity:MyActivity", activitySpan.operationName()); + assertEquals(parentSpan.context().spanId(), activitySpan.parentId()); + } + + @Test + public void testManagementCallsDoNotCreateSpans() throws TimeoutException { + interceptor.getActivityResult( + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-result", null, String.class)); + interceptor.getActivityResultAsync( + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-result-async", null, String.class)); + interceptor.describeActivity( + new ActivityClientCallsInterceptor.DescribeActivityInput("act-desc", null)); + interceptor.cancelActivity( + new ActivityClientCallsInterceptor.CancelActivityInput("act-cancel", null, "reason")); + interceptor.terminateActivity( + new ActivityClientCallsInterceptor.TerminateActivityInput("act-term", null, "reason")); + interceptor.listActivities( + new ActivityClientCallsInterceptor.ListActivitiesInput("TaskQueue = 'tq'")); + interceptor.countActivities( + new ActivityClientCallsInterceptor.CountActivitiesInput("TaskQueue = 'tq'")); + + assertTrue(mockTracer.finishedSpans().isEmpty()); + } + + private static class StubActivityClientCallsInterceptor + extends ActivityClientCallsInterceptorBase { + + StubActivityClientCallsInterceptor() { + super(null); + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + return new StartActivityOutput(input.getOptions().getId(), null); + } + + @Override + public GetActivityResultOutput getActivityResult(GetActivityResultInput input) + throws TimeoutException { + return new GetActivityResultOutput<>(null); + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + return CompletableFuture.completedFuture(new GetActivityResultOutput<>(null)); + } + + @Override + public DescribeActivityOutput describeActivity(DescribeActivityInput input) { + return new DescribeActivityOutput(null); + } + + @Override + public CancelActivityOutput cancelActivity(CancelActivityInput input) { + return new CancelActivityOutput(); + } + + @Override + public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { + return new TerminateActivityOutput(); + } + + @Override + public ListActivitiesOutput listActivities(ListActivitiesInput input) { + return new ListActivitiesOutput(Stream.empty()); + } + + @Override + public CountActivitiesOutput countActivities(CountActivitiesInput input) { + return new CountActivitiesOutput( + new ActivityExecutionCount(CountActivityExecutionsResponse.getDefaultInstance())); + } + } +} diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java new file mode 100644 index 0000000000..85c330c3b1 --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java @@ -0,0 +1,94 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentracing.Span; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.activity.ActivityInfo; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Unit tests for standalone activity tracing on the worker side. */ +public class StandaloneActivityWorkerTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private final SpanFactory spanFactory = new SpanFactory(otOptions); + private final ContextAccessor contextAccessor = new ContextAccessor(otOptions); + + private OpenTracingActivityInboundCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityInboundCallsInterceptor( + new StubActivityInboundCallsInterceptor(), otOptions, spanFactory, contextAccessor); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStandaloneActivityRunCreatesSpanWithActivityId() { + Header header = Header.empty(); + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createActivityStartSpan( + mockTracer, "MyStandaloneActivity", null, null, "act-run") + .start(), + header, + mockTracer); + activityStartSpan.finish(); + + ActivityExecutionContext executionContext = mock(ActivityExecutionContext.class); + ActivityInfo activityInfo = mock(ActivityInfo.class); + when(activityInfo.isInWorkflow()).thenReturn(false); + when(activityInfo.getActivityType()).thenReturn("MyStandaloneActivity"); + when(activityInfo.getActivityId()).thenReturn("act-run"); + when(executionContext.getInfo()).thenReturn(activityInfo); + + interceptor.init(executionContext); + interceptor.execute(new ActivityInboundCallsInterceptor.ActivityInput(header, new Object[0])); + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + MockSpan startSpan = + spansHelper.getSpanByOperationName("StartActivity:MyStandaloneActivity"); + MockSpan runSpan = spansHelper.getSpanByOperationName("RunActivity:MyStandaloneActivity"); + assertEquals("act-run", runSpan.tags().get("activityId")); + assertNull(runSpan.tags().get("workflowId")); + assertNull(runSpan.tags().get("runId")); + assertEquals(startSpan.context().spanId(), runSpan.parentId()); + } + + private static class StubActivityInboundCallsInterceptor + implements ActivityInboundCallsInterceptor { + @Override + public void init(ActivityExecutionContext context) {} + + @Override + public ActivityOutput execute(ActivityInput input) { + return new ActivityOutput(null); + } + } +}