Add option to let activities heartbeat during worker shutdown#2903
Add option to let activities heartbeat during worker shutdown#2903baekgyu-kim wants to merge 1 commit into
Conversation
maciejdudko
left a comment
There was a problem hiding this comment.
Hi @baekgyu-kim, thank you for your contribution! It's great to see someone taking on these long standing issues. However, this is not the right implementation.
There should be a new worker option to enable heartbeating during shutdown, It should default to disabled, and when disabled, the behavior should be identical to existing behavior for backward compatibility purposes.
When the option is enabled, the heartbeat behavior should be identical to what happens during normal heartbeat when the worker is not shutting down. There should be no additional code path that calls sendHeartbeatRequest a different way, the existing mechanism should be used. The way to achieve that is to modify SyncActivityWorker.shutdown so that heartbeatExecutor.shutdown is only called after all outstanding activity tasks have finished executing.
If you need assistance with implementation, feel free to reach out on community Slack, either message me directly or post on #java-sdk channel.
dc7f0cc to
0d8852a
Compare
|
Hi @maciejdudko, It now adds an experimental Whenever you have a chance, I'd appreciate another look. Thanks again! |
| private PollerBehavior workflowTaskPollersBehavior; | ||
| private PollerBehavior activityTaskPollersBehavior; | ||
| private PollerBehavior nexusTaskPollersBehavior; | ||
| private boolean activityHeartbeatDuringShutdown; |
There was a problem hiding this comment.
The field should be named allowActivityHeartbeatDuringShutdown, the options getter should be named getAllowActivityHeartbeatDuringShutdown, and the builder setter should be named setAllowActivityHeartbeatDuringShutdown. Apply this change consistently throughout the PR.
There was a problem hiding this comment.
Renamed consistently to allowActivityHeartbeatDuringShutdown (field), getAllowActivityHeartbeatDuringShutdown() (getter), and setAllowActivityHeartbeatDuringShutdown(...) (setter) across WorkerOptions, SingleWorkerOptions, and Worker.
| return null; | ||
| }); | ||
| CompletableFuture<Void> shutdownFuture; | ||
| if (activityHeartbeatDuringShutdown) { |
There was a problem hiding this comment.
When interruptTasks is true (shutdownNow was called instead of shutdown), it should behave as if heartbeat during shutdown was disabled.
| if (activityHeartbeatDuringShutdown) { | |
| if (allowActivityHeartbeatDuringShutdown && !interruptTasks) { |
There was a problem hiding this comment.
Updated the condition to allowActivityHeartbeatDuringShutdown && !interruptTasks, so shutdownNow now behaves exactly as if the option were disabled.
| * io.temporal.client.ActivityWorkerShutdownException}, unless {@link | ||
| * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case | ||
| * heartbeats keep working until the activity tasks finish executing.<br> |
There was a problem hiding this comment.
shutdownNow behavior stays the same, see comment in SyncActivityWorker.
| * io.temporal.client.ActivityWorkerShutdownException}, unless {@link | |
| * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case | |
| * heartbeats keep working until the activity tasks finish executing.<br> | |
| * io.temporal.client.ActivityWorkerShutdownException}.<br> |
There was a problem hiding this comment.
Reverted. The WorkerFactory.shutdown Javadoc no longer references the option, so shutdownNow keeps its original documented behavior.
| /** | ||
| * If enabled, activities can keep heartbeating while the worker is shutting down. The activity | ||
| * heartbeat executor is closed only after all outstanding activity tasks have finished | ||
| * executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves | ||
| * exactly as it does while the worker is running: heartbeats are throttled and sent to the | ||
| * server, which keeps the server from timing the activity out during the {@link | ||
| * WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period. | ||
| * | ||
| * <p>Note that with this option enabled activities are no longer notified of the worker | ||
| * shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code | ||
| * heartbeat}, so they are expected to complete within the termination grace period on their | ||
| * own. | ||
| * | ||
| * <p>Defaults to false, meaning that after shutdown is requested, {@link | ||
| * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and | ||
| * throws {@link io.temporal.client.ActivityWorkerShutdownException}. | ||
| */ | ||
| @Experimental | ||
| public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) { |
There was a problem hiding this comment.
We don't want to document implementation details.
| /** | |
| * If enabled, activities can keep heartbeating while the worker is shutting down. The activity | |
| * heartbeat executor is closed only after all outstanding activity tasks have finished | |
| * executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves | |
| * exactly as it does while the worker is running: heartbeats are throttled and sent to the | |
| * server, which keeps the server from timing the activity out during the {@link | |
| * WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period. | |
| * | |
| * <p>Note that with this option enabled activities are no longer notified of the worker | |
| * shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code | |
| * heartbeat}, so they are expected to complete within the termination grace period on their | |
| * own. | |
| * | |
| * <p>Defaults to false, meaning that after shutdown is requested, {@link | |
| * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and | |
| * throws {@link io.temporal.client.ActivityWorkerShutdownException}. | |
| */ | |
| @Experimental | |
| public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) { | |
| /** | |
| * If true, activities can keep heartbeating during graceful worker shutdown (see {@link | |
| * io.temporal.worker.WorkerFactory#shutdown WorkerFactory.shutdown}). Defaults to false, | |
| * which means that after graceful shutdown is requested, calling {@link | |
| * io.temporal.activity.ActivityExecutionContext#heartbeat ActivityExecutionContext.heartbeat} | |
| * does not send a heartbeat and instead throws {@link | |
| * io.temporal.client.ActivityWorkerShutdownException ActivityWorkerShutdownException}. This | |
| * option is ignored by non-graceful shutdown (see {@link | |
| * io.temporal.worker.WorkerFactory#shutdownNow WorkerFactory.shutdownNow}). | |
| * | |
| * <p>Note that with this option enabled, activities are no longer notified of the worker | |
| * shutdown by the {@link io.temporal.client.ActivityWorkerShutdownException | |
| * ActivityWorkerShutdownException} exception, so they are expected to complete within the | |
| * termination grace period on their own. | |
| */ | |
| @Experimental | |
| public Builder setAllowActivityHeartbeatDuringShutdown(boolean allowActivityHeartbeatDuringShutdown) { |
There was a problem hiding this comment.
Applied your suggested wording and dropped the implementation details.
| WorkflowExecution execution = WorkflowClient.start(workflow::execute); | ||
| started.get(); | ||
| testWorkflowRule.getTestEnvironment().shutdown(); |
There was a problem hiding this comment.
There's a race condition here - shutdown() call can go through before activity worker receives the task, which will prevent the activity from running and the test will fail.
This feature will be easier to test using a standalone activity. It should work like this:
- Test starts activity.
- Test blocks on a semaphore 1 until activity starts.
- Activity signals semaphore 1.
- Activity blocks on semaphore 2 until shutdown is triggered.
- Test calls
shutdown(). - Test signals semaphore 2.
- Activity heartbeats then returns. (An exception will fail the activity.)
- Test calls
result()on activity handle to ensure it succeeded. (Failure will throw exception and fail the test.)
There was a problem hiding this comment.
Thanks for the detailed outline.
Reworked the test to use a standalone activity with the two-semaphore handshake you described: it waits for the activity to start, triggers shutdown, then releases the activity to heartbeat and return.
This removes the race where shutdown() could land before the task was picked up.
| * ActivityWorkerShutdownException}. | ||
| */ | ||
| @Test | ||
| public void testHeartbeatingActivityCompletesDuringShutdown() |
There was a problem hiding this comment.
Also add a test case for when shutdownNow is called instead of shutdown.
There was a problem hiding this comment.
Added testHeartbeatingActivityFailsDuringShutdownNow, which calls shutdownNow() and asserts the heartbeat fails the activity instead of letting it complete, confirming the option is ignored for non-graceful shutdown.
|
Hi @maciejdudko, I've addressed all of your comments:
Thanks again for the careful review! |
What was changed
Added an experimental worker option,
WorkerOptions.Builder#setAllowActivityHeartbeatDuringShutdown(boolean)(defaultfalse, preserving the existing behavior).shutdownNowis used, the behavior is unchanged: the heartbeat executor is shut down first, soActivityExecutionContext.heartbeat()throwsActivityWorkerShutdownException.allowActivityHeartbeatDuringShutdown && !interruptTasks, so non-graceful shutdown (WorkerFactory.shutdownNow) always behaves as if the option were disabled.Touched:
WorkerOptions,SingleWorkerOptions,Worker,SyncActivityWorker.Why?
heartbeat()threwActivityWorkerShutdownExceptionfor the rest of theawaitTerminationgrace period. An activity that wanted to run to completion during that window could not refresh its server-side heartbeat deadline → the server timed it out and retried it → duplicate executions, even though the worker deliberately gave the activity time to finish.false, so existing behavior is preserved, andshutdownNowis intentionally excluded. Originates from Add the ability to keep heartbeating while the worker is shutting down #2075.Note: with the option enabled, activities are no longer notified of shutdown via
ActivityWorkerShutdownException, so they are expected to complete within the termination grace period on their own. This is documented on the setter.Checklist
Closes Add the ability to keep heartbeating while the worker is shutting down #2075
How was this tested:
New
HeartbeatDuringWorkerShutdownTest(standalone activity + two-semaphore handshake; gated behindSDKTestWorkflowRule.useExternalService):testHeartbeatingActivityCompletesDuringShutdown— option enabled + gracefulshutdown(): the activity heartbeats and runs to completion.testHeartbeatingActivityFailsDuringShutdownNow—shutdownNow(): the option is ignored, and the heartbeat fails the activity.WorkerOptionsTest— copy-builder round-trips the new option.Any docs updates needed?
WorkerOptions.Builder#setAllowActivityHeartbeatDuringShutdownJavadoc.