From e5e60a0496ecdd551b63244f0754a7047812308b Mon Sep 17 00:00:00 2001 From: Baekgyu Date: Sat, 13 Jun 2026 11:26:25 +0900 Subject: [PATCH] Add option to let activities heartbeat during worker shutdown --- .../internal/worker/SingleWorkerOptions.java | 20 ++- .../internal/worker/SyncActivityWorker.java | 37 +++-- .../main/java/io/temporal/worker/Worker.java | 1 + .../io/temporal/worker/WorkerOptions.java | 48 ++++++- .../io/temporal/worker/WorkerOptionsTest.java | 3 + .../HeartbeatDuringWorkerShutdownTest.java | 136 ++++++++++++++++++ 6 files changed, 228 insertions(+), 17 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java index 5593707720..f53802f489 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java @@ -41,6 +41,7 @@ public static final class Builder { private boolean usingVirtualThreads; private WorkerDeploymentOptions deploymentOptions; private String workerInstanceKey; + private boolean allowActivityHeartbeatDuringShutdown; private Builder() {} @@ -66,6 +67,7 @@ private Builder(SingleWorkerOptions options) { this.usingVirtualThreads = options.isUsingVirtualThreads(); this.deploymentOptions = options.getDeploymentOptions(); this.workerInstanceKey = options.getWorkerInstanceKey(); + this.allowActivityHeartbeatDuringShutdown = options.getAllowActivityHeartbeatDuringShutdown(); } public Builder setIdentity(String identity) { @@ -162,6 +164,12 @@ public Builder setWorkerInstanceKey(String workerInstanceKey) { return this; } + public Builder setAllowActivityHeartbeatDuringShutdown( + boolean allowActivityHeartbeatDuringShutdown) { + this.allowActivityHeartbeatDuringShutdown = allowActivityHeartbeatDuringShutdown; + return this; + } + public SingleWorkerOptions build() { PollerOptions pollerOptions = this.pollerOptions; if (pollerOptions == null) { @@ -201,7 +209,8 @@ public SingleWorkerOptions build() { drainStickyTaskQueueTimeout, usingVirtualThreads, this.deploymentOptions, - this.workerInstanceKey); + this.workerInstanceKey, + this.allowActivityHeartbeatDuringShutdown); } } @@ -223,6 +232,7 @@ public SingleWorkerOptions build() { private final boolean usingVirtualThreads; private final WorkerDeploymentOptions deploymentOptions; private final String workerInstanceKey; + private final boolean allowActivityHeartbeatDuringShutdown; private SingleWorkerOptions( String identity, @@ -242,7 +252,8 @@ private SingleWorkerOptions( Duration drainStickyTaskQueueTimeout, boolean usingVirtualThreads, WorkerDeploymentOptions deploymentOptions, - String workerInstanceKey) { + String workerInstanceKey, + boolean allowActivityHeartbeatDuringShutdown) { this.identity = identity; this.binaryChecksum = binaryChecksum; this.buildId = buildId; @@ -261,6 +272,7 @@ private SingleWorkerOptions( this.usingVirtualThreads = usingVirtualThreads; this.deploymentOptions = deploymentOptions; this.workerInstanceKey = workerInstanceKey; + this.allowActivityHeartbeatDuringShutdown = allowActivityHeartbeatDuringShutdown; } public String getIdentity() { @@ -291,6 +303,10 @@ public Duration getDrainStickyTaskQueueTimeout() { return drainStickyTaskQueueTimeout; } + public boolean getAllowActivityHeartbeatDuringShutdown() { + return allowActivityHeartbeatDuringShutdown; + } + public DataConverter getDataConverter() { return dataConverter; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index ecb24a736a..4eafdb38cf 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -26,6 +26,7 @@ public class SyncActivityWorker implements SuspendableWorker { private final ScheduledExecutorService heartbeatExecutor; private final ActivityTaskHandlerImpl taskHandler; private final ActivityWorker worker; + private final boolean allowActivityHeartbeatDuringShutdown; public SyncActivityWorker( WorkflowClient client, @@ -38,6 +39,7 @@ public SyncActivityWorker( this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; + this.allowActivityHeartbeatDuringShutdown = options.getAllowActivityHeartbeatDuringShutdown(); this.heartbeatExecutor = Executors.newScheduledThreadPool( @@ -89,16 +91,31 @@ public boolean start() { @Override public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { - return shutdownManager - // we want to shut down heartbeatExecutor before activity worker, so in-flight activities - // could get an ActivityWorkerShutdownException from their heartbeat - .shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5)) - .thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks)) - .exceptionally( - e -> { - log.error("[BUG] Unexpected exception during shutdown", e); - return null; - }); + CompletableFuture shutdownFuture; + if (allowActivityHeartbeatDuringShutdown && !interruptTasks) { + // we want to shut down heartbeatExecutor only after all outstanding activity tasks have + // finished executing, so in-flight activities can keep heartbeating during the shutdown + shutdownFuture = + worker + .shutdown(shutdownManager, interruptTasks) + .thenCompose(r -> shutdownHeartbeatExecutor(shutdownManager)); + } else { + // we want to shut down heartbeatExecutor before activity worker, so in-flight activities + // could get an ActivityWorkerShutdownException from their heartbeat + shutdownFuture = + shutdownHeartbeatExecutor(shutdownManager) + .thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks)); + } + return shutdownFuture.exceptionally( + e -> { + log.error("[BUG] Unexpected exception during shutdown", e); + return null; + }); + } + + private CompletableFuture shutdownHeartbeatExecutor(ShutdownManager shutdownManager) { + return shutdownManager.shutdownExecutor( + heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5)); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index c846667d68..6355e5a75a 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -893,6 +893,7 @@ private static SingleWorkerOptions toActivityOptions( return toSingleWorkerOptions( factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker()) + .setAllowActivityHeartbeatDuringShutdown(options.getAllowActivityHeartbeatDuringShutdown()) .setPollerOptions( PollerOptions.newBuilder() .setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond()) diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java index f8bfd2f442..84db0b8d62 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java @@ -77,6 +77,7 @@ public static final class Builder { private PollerBehavior workflowTaskPollersBehavior; private PollerBehavior activityTaskPollersBehavior; private PollerBehavior nexusTaskPollersBehavior; + private boolean allowActivityHeartbeatDuringShutdown; private Builder() {} @@ -112,6 +113,7 @@ private Builder(WorkerOptions o) { this.workflowTaskPollersBehavior = o.workflowTaskPollersBehavior; this.activityTaskPollersBehavior = o.activityTaskPollersBehavior; this.nexusTaskPollersBehavior = o.nexusTaskPollersBehavior; + this.allowActivityHeartbeatDuringShutdown = o.allowActivityHeartbeatDuringShutdown; } /** @@ -524,6 +526,28 @@ public Builder setNexusTaskPollersBehavior(PollerBehavior pollerBehavior) { return this; } + /** + * If true, activities can keep heartbeating during graceful worker shutdown (see {@link + * io.temporal.worker.WorkerFactory#shutdown WorkerFactory.shutdown}). Defaults to false, which + * means that after graceful shutdown is requested, calling {@link + * io.temporal.activity.ActivityExecutionContext#heartbeat ActivityExecutionContext.heartbeat} + * does not send a heartbeat and instead throws {@link + * io.temporal.client.ActivityWorkerShutdownException ActivityWorkerShutdownException}. This + * option is ignored by non-graceful shutdown (see {@link + * io.temporal.worker.WorkerFactory#shutdownNow WorkerFactory.shutdownNow}). + * + *

Note that with this option enabled, activities are no longer notified of the worker + * shutdown by the {@link io.temporal.client.ActivityWorkerShutdownException + * ActivityWorkerShutdownException} exception, so they are expected to complete within the + * termination grace period on their own. + */ + @Experimental + public Builder setAllowActivityHeartbeatDuringShutdown( + boolean allowActivityHeartbeatDuringShutdown) { + this.allowActivityHeartbeatDuringShutdown = allowActivityHeartbeatDuringShutdown; + return this; + } + public WorkerOptions build() { return new WorkerOptions( maxWorkerActivitiesPerSecond, @@ -553,7 +577,8 @@ public WorkerOptions build() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + allowActivityHeartbeatDuringShutdown); } public WorkerOptions validateAndBuildWithDefaults() { @@ -685,7 +710,8 @@ public WorkerOptions validateAndBuildWithDefaults() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + allowActivityHeartbeatDuringShutdown); } } @@ -717,6 +743,7 @@ public WorkerOptions validateAndBuildWithDefaults() { private final PollerBehavior workflowTaskPollersBehavior; private final PollerBehavior activityTaskPollersBehavior; private final PollerBehavior nexusTaskPollersBehavior; + private final boolean allowActivityHeartbeatDuringShutdown; private WorkerOptions( double maxWorkerActivitiesPerSecond, @@ -746,7 +773,8 @@ private WorkerOptions( WorkerDeploymentOptions deploymentOptions, PollerBehavior workflowTaskPollersBehavior, PollerBehavior activityTaskPollersBehavior, - PollerBehavior nexusTaskPollersBehavior) { + PollerBehavior nexusTaskPollersBehavior, + boolean allowActivityHeartbeatDuringShutdown) { this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond; this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize; @@ -775,6 +803,7 @@ private WorkerOptions( this.workflowTaskPollersBehavior = workflowTaskPollersBehavior; this.activityTaskPollersBehavior = activityTaskPollersBehavior; this.nexusTaskPollersBehavior = nexusTaskPollersBehavior; + this.allowActivityHeartbeatDuringShutdown = allowActivityHeartbeatDuringShutdown; } public double getMaxWorkerActivitiesPerSecond() { @@ -912,6 +941,11 @@ public PollerBehavior getNexusTaskPollersBehavior() { return nexusTaskPollersBehavior; } + @Experimental + public boolean getAllowActivityHeartbeatDuringShutdown() { + return allowActivityHeartbeatDuringShutdown; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -944,7 +978,8 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond && Objects.equals(deploymentOptions, that.deploymentOptions) && Objects.equals(workflowTaskPollersBehavior, that.workflowTaskPollersBehavior) && Objects.equals(activityTaskPollersBehavior, that.activityTaskPollersBehavior) - && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior); + && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior) + && allowActivityHeartbeatDuringShutdown == that.allowActivityHeartbeatDuringShutdown; } @Override @@ -977,7 +1012,8 @@ public int hashCode() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + allowActivityHeartbeatDuringShutdown); } @Override @@ -1040,6 +1076,8 @@ public String toString() { + activityTaskPollersBehavior + ", nexusTaskPollersBehavior=" + nexusTaskPollersBehavior + + ", allowActivityHeartbeatDuringShutdown=" + + allowActivityHeartbeatDuringShutdown + '}'; } } diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java index 897600443d..9bde963162 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java @@ -56,6 +56,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() { .setBuildId("build-id") .setStickyTaskQueueDrainTimeout(Duration.ofSeconds(15)) .setIdentity("worker-identity") + .setAllowActivityHeartbeatDuringShutdown(true) .build(); WorkerOptions w2 = WorkerOptions.newBuilder(w1).build(); @@ -89,6 +90,8 @@ public void verifyNewBuilderFromExistingWorkerOptions() { assertEquals(w1.getBuildId(), w2.getBuildId()); assertEquals(w1.getStickyTaskQueueDrainTimeout(), w2.getStickyTaskQueueDrainTimeout()); assertEquals(w1.getIdentity(), w2.getIdentity()); + assertEquals( + w1.getAllowActivityHeartbeatDuringShutdown(), w2.getAllowActivityHeartbeatDuringShutdown()); } @Test diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java new file mode 100644 index 0000000000..f6b0e0bc26 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java @@ -0,0 +1,136 @@ +package io.temporal.worker.shutdown; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.client.ActivityClient; +import io.temporal.client.ActivityClientOptions; +import io.temporal.client.ActivityFailedException; +import io.temporal.client.ActivityHandle; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link WorkerOptions.Builder#setAllowActivityHeartbeatDuringShutdown(boolean)}. Gated + * behind {@link SDKTestWorkflowRule#useExternalService} because the embedded test server may not + * support the standalone activity APIs. + */ +public class HeartbeatDuringWorkerShutdownTest { + + private static final String EXPECTED_RESULT = "completed"; + + private final Semaphore activityStarted = new Semaphore(0); + private final Semaphore shutdownTriggered = new Semaphore(0); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setTestTimeoutSeconds(60) + .setWorkerOptions( + WorkerOptions.newBuilder().setAllowActivityHeartbeatDuringShutdown(true).build()) + .setActivityImplementations( + new HeartbeatingActivityImpl(activityStarted, shutdownTriggered)) + .build(); + + /** + * Tests that when {@link WorkerOptions.Builder#setAllowActivityHeartbeatDuringShutdown(boolean)} + * is enabled, heartbeats keep working after a graceful worker shutdown is initiated and the + * activity runs to completion instead of getting an {@link + * io.temporal.client.ActivityWorkerShutdownException}. + */ + @Test + public void testHeartbeatingActivityCompletesDuringShutdown() throws InterruptedException { + assumeTrue(SDKTestWorkflowRule.useExternalService); + ActivityHandle handle = startHeartbeatingActivity(); + + assertTrue( + "Activity did not start within 30s", activityStarted.tryAcquire(30, TimeUnit.SECONDS)); + testWorkflowRule.getTestEnvironment().shutdown(); + shutdownTriggered.release(); + + // a heartbeat failure would fail the activity and make getResult throw + assertEquals(EXPECTED_RESULT, handle.getResult()); + testWorkflowRule.getTestEnvironment().awaitTermination(30, TimeUnit.SECONDS); + } + + /** + * Tests that {@link WorkerOptions.Builder#setAllowActivityHeartbeatDuringShutdown(boolean)} is + * ignored by {@link io.temporal.worker.WorkerFactory#shutdownNow()}: the heartbeat fails the + * activity with an {@link io.temporal.client.ActivityWorkerShutdownException} instead of letting + * it complete. + */ + @Test + public void testHeartbeatingActivityFailsDuringShutdownNow() throws InterruptedException { + assumeTrue(SDKTestWorkflowRule.useExternalService); + ActivityHandle handle = startHeartbeatingActivity(); + + assertTrue( + "Activity did not start within 30s", activityStarted.tryAcquire(30, TimeUnit.SECONDS)); + testWorkflowRule.getTestEnvironment().shutdownNow(); + shutdownTriggered.release(); + + // if heartbeating was incorrectly allowed, the activity would complete successfully and this + // assertion would fail + assertThrows(ActivityFailedException.class, handle::getResult); + testWorkflowRule.getTestEnvironment().awaitTermination(30, TimeUnit.SECONDS); + } + + private ActivityHandle startHeartbeatingActivity() { + ActivityClient client = + ActivityClient.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + ActivityClientOptions.newBuilder().setNamespace(SDKTestWorkflowRule.NAMESPACE).build()); + StartActivityOptions options = + StartActivityOptions.newBuilder() + .setId("heartbeat-during-shutdown-" + UUID.randomUUID()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build(); + return client.start(HeartbeatingActivity.class, HeartbeatingActivity::execute, options); + } + + @ActivityInterface + public interface HeartbeatingActivity { + @ActivityMethod + String execute(); + } + + public static class HeartbeatingActivityImpl implements HeartbeatingActivity { + private final Semaphore activityStarted; + private final Semaphore shutdownTriggered; + + public HeartbeatingActivityImpl(Semaphore activityStarted, Semaphore shutdownTriggered) { + this.activityStarted = activityStarted; + this.shutdownTriggered = shutdownTriggered; + } + + @Override + public String execute() { + activityStarted.release(); + try { + if (!shutdownTriggered.tryAcquire(30, TimeUnit.SECONDS)) { + throw new IllegalStateException("Worker shutdown was not triggered within 30s"); + } + } catch (InterruptedException e) { + // we ignore the interruption issued by shutdownNow and proceed to the heartbeat below, + // which is the signal under test + } + Activity.getExecutionContext().heartbeat("progress"); + return EXPECTED_RESULT; + } + } +}