[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698
[CELEBORN-2336] Support decommission shutdown for worker scale-down scenarios#3698chenghuichen wants to merge 11 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in “decommission-on-SIGTERM” mode for workers to better support scale-down scenarios, by introducing a new config flag that makes SIGTERM follow the decommission flow (report to master and wait for shuffle data consumption/expiration) instead of the graceful-restart flow.
Changes:
- Add
celeborn.worker.decommission.shutdown.enabledconfig and surface it viaCelebornConf. - Make
workerGracefulShutdowneffectively disabled when decommission-on-shutdown is enabled, and setWorkerStatusManager.exitEventTypeaccordingly. - Attempt to extend shutdown hook timeout to
workerDecommissionForceExitTimeoutbefore decommissioning during shutdown.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala | Initialize exitEventType to Decommission when decommission-on-shutdown is enabled (overriding graceful). |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | On decommission shutdown path, call ShutdownHookManager.updateTimeout before decommissionWorker(). |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Add new config entry + getter, and ensure graceful shutdown is suppressed when decommission shutdown is enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for first contribution. Could you please add test cases for this support?
|
@SteNicholas Thanks for the review. Fixed the timeout issue, added tests, and updated worker.md. |
|
@chenghuichen, please use |
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for update. I left some comments for udpates. PTAL.
|
Overall: Clean, well-scoped change. The decommission-on-SIGTERM approach is the right abstraction for scale-down — avoids requiring custom preStop scripts. The Copilot/SteNicholas feedback has been addressed well (explicit timeout registration, Config version should likely be The new config Reviewed with Claude Code |
|
this is awesome and a known issue with auto-scaling down, thank you for your contribution @chenghuichen. I will review this PR soon. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
Overall lgtm. Could you pls address my comments and the other comments.
|
@afterincomparableyum Thanks for the review! Updated: added assertions, fixed version to 0.7.0, clarified docs. |
1d92a40 to
cf8d472
Compare
SteNicholas
left a comment
There was a problem hiding this comment.
@chenghuichen, thanks for updates. I left some comments for the updates. PTAL.
There was a problem hiding this comment.
overall lgtm, thanks for contributing. Could you address the last comments by CoPilot please and @SteNicholas.
|
@chenghuichen, any update? |
SteNicholas
left a comment
There was a problem hiding this comment.
Reviewed the decommission-shutdown path. The state-machine wiring and the new WORKER_DECOMMISSION exit-kind plumbing through stop()/close() look correct, and the config version + docs row check out. One correctness concern (the shutdown-hook timeout budget doesn't account for stop()), one behavior change worth confirming, and a few cleanup/altitude nits inline.
|
@SteNicholas Thanks for the review! All fixed. |
There was a problem hiding this comment.
@chenghuichen, re-review of the current revision. This version addresses my earlier feedback well — the effectiveWorkerGracefulShutdown accessor de-duplicates the override, drainBeforeExit consolidates the exit-kind checks, and the waitTime + interval <= timeout bound makes the budget self-consistent in the common case. close(WORKER_DECOMMISSION) is db-null-safe and the master stops allocating slots (heartbeat reports InDecommission).
Remaining items are mostly about the shutdown timing budget in the worst case and a couple of intent/behavior inconsistencies — inline below. Two low-severity notes not inlined: (a) a spurious transition status ... is not allowed warning is logged on every runtime REST decommission because exit() already moved the state to InDecommission before decommissionWorker() re-transitions it; (b) exitEventType is sticky — recommissionWorker() resets state to Normal but not exitEventType, so a Recommission can't restore non-decommission exit semantics (likely intended for a decommission-shutdown worker, worth confirming).
|
|
||
| while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) { | ||
| // Bound the total wait strictly by the timeout so that the remaining shutdown hook | ||
| // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. |
There was a problem hiding this comment.
Worst case can truncate stop() mid-cleanup. This comment claims the wait loop leaves budget for stop(), but the loop below can consume the entire forceExitTimeout (floor(6h/30s)·30s = 6h with defaults), leaving only ~checkInterval (30s) of the forceExitTimeout + checkInterval hook budget for stop(WORKER_DECOMMISSION). That path runs StorageManager.cleanupExpiredShuffleKey(shuffleKeySet(), false) — unbounded synchronous DFS delete() of all unreleased shuffle (exactly the slow case, since shuffle didn't drain) — plus flusher + 3 netty server shutdowns. If it exceeds 30s, ShutdownHookManager.executeShutdown does future.cancel(true), interrupting teardown → orphaned DFS objects and transitionState(Exit) skipped. The sendWorkerDecommissionToMaster() askSync before the loop is also unbudgeted (retries on a slow master). Consider bounding the wait to leave a teardown headroom proportional to the real stop() cost, not a fixed checkInterval.
| while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) { | ||
| // Bound the total wait strictly by the timeout so that the remaining shutdown hook | ||
| // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. | ||
| while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval <= timeout) { |
There was a problem hiding this comment.
forceExitTimeout < checkInterval → no wait at all → data loss. With waitTimes=0 the guard is 0 + interval <= timeout, i.e. checkInterval <= forceExitTimeout. If an operator sets forceExitTimeout below checkInterval, the loop body never executes — decommissionWorker() returns immediately and stop() deletes shuffle data consumers still need → FetchFailure / SHUFFLE_DATA_LOST. The previous waitTime < timeout always waited at least once. There's no checkValue guarding forceExitTimeout >= checkInterval; consider adding one, or special-casing interval > timeout to still wait (bounded by timeout).
| stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) | ||
| case WorkerEventType.Decommission => | ||
| decommissionWorker() | ||
| stop(CelebornExitKind.WORKER_DECOMMISSION) |
There was a problem hiding this comment.
Behavior change for runtime REST decommission. This branch is reached not only by the new SIGTERM feature but also by the existing exit("DECOMMISSION") REST API (via the exit-thread → System.exit → hook). Pre-PR that path ran stop(EXIT_IMMEDIATELY); now it runs stop(WORKER_DECOMMISSION), which via drainBeforeExit makes the heartbeat/fast-fail tasks cancel(false) and the thread pools shutdown() (drain) instead of cancel(true)/shutdownNow(). So every cluster using the REST decommission API gets changed teardown semantics even without enabling celeborn.worker.decommission.shutdown.enabled. Worth calling out explicitly as intended.
There was a problem hiding this comment.
Not a regression — the existing REST exit("DECOMMISSION") path already swallows this, and the master falls back to heartbeat-timeout eviction.
|
|
||
| // Both graceful shutdown and decommission have drained data, so in-flight | ||
| // tasks are allowed to finish instead of being force-cancelled. | ||
| val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || |
There was a problem hiding this comment.
Drain intent isn't carried through to the partition sorter. This comment/drainBeforeExit declare that decommission "has drained data, so in-flight tasks are allowed to finish" — and that holds for the thread pools here. But stop() later calls partitionsSorter.close(WORKER_DECOMMISSION), and PartitionFilesSorter.close only takes the await-termination branch for exitKind == WORKER_GRACEFUL_SHUTDOWN; WORKER_DECOMMISSION falls into the else and calls fileSorterExecutors.shutdownNow() (PartitionFilesSorter.java:415-417), force-interrupting an in-flight on-demand sort → truncated sorted file/index → fetch failure for a reader mid-sort. Not a regression vs the old immediate path, but inconsistent with the new drain semantics — either group WORKER_DECOMMISSION with graceful in the sorter, or soften this comment.
| shutdownGracefully() | ||
| stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) | ||
| case WorkerEventType.Decommission => | ||
| decommissionWorker() |
There was a problem hiding this comment.
Decommission report failure has no retry/fallback on the SIGTERM path. decommissionWorker() calls sendWorkerDecommissionToMaster(), which swallows any exception and proceeds. If the master is briefly unreachable when SIGTERM fires, no decommission record reaches it and it must wait for heartbeat-timeout eviction. The other two exit paths are more robust — exitImmediately() uses WorkerLost and shutdownGracefully() uses ReportWorkerUnavailable. Consider a single retry or a ReportWorkerUnavailable fallback to match them.
| // The wait loop in decommissionWorker() is bounded by forceExitTimeout, so the extra | ||
| // checkInterval reserves headroom for stop(WORKER_DECOMMISSION) to finish cleanup | ||
| // before the hook is cancelled. | ||
| ShutdownHookManager.get().updateTimeout( |
There was a problem hiding this comment.
Altitude: updateTimeout is process-wide. ShutdownHookManager.updateTimeout does hooks.forEach(setTimeout), so this raises the timeout of every registered hook (not just the worker hook) to forceExitTimeout + checkInterval (6h+ by default). An unrelated stuck hook could then hang JVM exit for hours where the default would force-terminate it. Prefer registering the worker hook with the 4-arg addShutdownHook(hook, priority, timeout, unit) so only that hook gets the extended budget — which also removes the register-then-update sequencing.
| override def exit(exitType: String): String = { | ||
| exitType.toUpperCase(Locale.ROOT) match { | ||
| case "DECOMMISSION" => | ||
| ShutdownHookManager.get().updateTimeout( |
There was a problem hiding this comment.
Duplicated hook-timeout expression. conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval appears here (REST exit handler) and again in the construction block (~line 1107). Extract a single decommissionHookTimeoutMs helper so the SIGTERM and REST decommission paths can't drift to different timeouts.
| private val gracefulShutdown = conf.workerGracefulShutdown | ||
| if (gracefulShutdown) { | ||
| if (decommissionShutdown) { | ||
| exitEventType = WorkerEventType.Decommission |
There was a problem hiding this comment.
exitEventType is read across threads without synchronization. It's written here and in exit() under this.synchronized, but the shutdown-hook thread reads workerStatusManager.exitEventType (Worker.scala:1087) without the lock, and the field (declared line 38) isn't @volatile. A runtime REST exit(...) that updates it shortly before an independent SIGTERM-triggered hook read has no happens-before guarantee → the hook could take a stale branch (e.g. exitImmediately instead of decommissionWorker). Mark exitEventType @volatile.
| "This is suitable for permanent scale-down scenarios where the worker will not restart. " + | ||
| "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + | ||
| "(recovery state will not be saved since the worker is not expected to come back). " + | ||
| "Operators should set the pod's terminationGracePeriodSeconds to " + |
There was a problem hiding this comment.
terminationGracePeriodSeconds guidance understates the budget. This tells operators to set it to forceExitTimeout + checkInterval + small buffer, but stop() teardown runs inside that same forceExitTimeout + checkInterval hook timeout, and its cost is unbounded (DFS deletes of all unreleased shuffle). When the wait loop runs close to forceExitTimeout, only ~checkInterval is left for stop(), so a "small buffer" can't cover it and the pod is SIGKILLed mid-cleanup. The in-code comments acknowledge stop() needs headroom; the operator-facing doc should say so too (and the buffer should scale with the worst-case teardown, not be "small").
|
|
||
| // Graceful shutdown only → Graceful | ||
| val conf2 = new CelebornConf() | ||
| conf2.set("celeborn.worker.graceful.shutdown.enabled", "true") |
There was a problem hiding this comment.
Non-defensive test setup. conf1 above explicitly sets both keys with the comment "so the assertion does not depend on system properties leaked from other tests." But conf2 here sets only graceful.shutdown.enabled=true (not decommission.shutdown.enabled=false). Since new CelebornConf() loads system properties, a leaked celeborn.worker.decommission.shutdown.enabled=true would make exitEventType=Decommission and this assertEquals(Graceful, mgr2.exitEventType) fail. Apply conf1's both-keys defensiveness to conf2/conf3.
… visibility - Bound the decommission drain by clamping each sleep to the remaining timeout, so a forceExitTimeout smaller than checkInterval still drains instead of skipping the wait and dropping unconsumed shuffle - Mark exitEventType @volatile for cross-thread visibility from the hook - Register the worker hook with explicit 4-arg addShutdownHook timeout (no process-wide updateTimeout); extract decommissionHookTimeoutMs - Skip the redundant InDecommission re-transition on the REST path - Harden exitEventType test setup; clarify budget docs
* upstream/main: (26 commits) [CELEBORN-2363] Bump Flink version to 1.20.5, 2.0.2 and 2.1.3 [MINOR] Configure GitHub workflows to use concurrency cancel-in-progress for pull requests [CELEBORN-2016] Fix the worker decommission and graceful shutdown condition [CELEBORN-2351] Partition file sorting should only be paused for PUSH_AND_REPLICATE_PAUSED [CELEBORN-2319] Standalone LifecycleManager && rust sdk [CELEBORN-2315][FOLLOWUP] Change assertIteratorFullyConsumed to throw CelebornIOException [CELEBORN-2315] Add iterator fully-consumed validation after shuffle write [MINOR] Fix PbSerDeUtilsTest failure [CELEBORN-2313] Extend E2E checked zone to batch assembly point [CELEBORN-2347] Change get reducer file group cache to expireAfterAccess [CELEBORN-2332] Fix self join deadlock in C++ WorkerPartitionReader fetch callbacks [CELEBORN-2348] Support end-to-end shuffle integrity check for Flink [CELEBORN-1577][FOLLOWUP] Fix backward compatiblity issue with interrupt shuffle [CELEBORN-2346] Add RequestSlots failure metrics [CELEBORN-2343] Fix timer leak in handlePushData [CELEBORN-2310][FOLLOWUP] Account actualUsableSpace while excluding workers [CELEBORN-2275][CIP-14] Add C++ merge-write and Java-read hybrid integration test [CELEBORN-2342] Fix object aliasing in LegacySkewHandlingPartitionValidator corrupting sub-range metadata [CELEBORN-2345] Fix allocation for rpcAskTimeout [CELEBORN-2339] Add tools.jar into classpath only for Java 8 ... # Conflicts: # worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
|
The real bugs are fixed; the remaining points are intentional tradeoffs for an opt-in flag. One ask for future rounds: please reconcile suggestions against the full review history. The hook-timeout registration, for instance, has now round-tripped 4-arg → |
What changes were proposed in this pull request?
Add
celeborn.worker.decommission.shutdown.enabledconfiguration. When set to true, the worker will walk the decommission path (sendReportWorkerDecommissionto master, wait for all shuffle data to be consumed or expired) upon receiving SIGTERM, instead of the graceful shutdown path.When enabled, this overrides
celeborn.worker.graceful.shutdown.enabled. The shutdown hook timeout is also extended toceleborn.worker.decommission.forceExitTimeoutto match the decommission wait window.Why are the changes needed?
In scheduled auto-scaling scenarios (e.g., scale up at peak hours, scale down at off-peak), operators want to simply shrink the node pool and let the PaaS layer send SIGTERM without writing custom preStop scripts or manually invoking the decommission REST API.
The existing graceful shutdown (
celeborn.worker.graceful.shutdown.enabled=true) is designed for rolling upgrades. For permanent scale-down, the correct semantic is decommission. Previously this could only be triggered via REST API or master-pushed events, requiring additional scripting in the teardown workflow.With this change, operators only need to set one config and align
celeborn.worker.decommission.forceExitTimeoutwith the pod'sterminationGracePeriodSeconds.Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
WorkerStatusManager.exitEventTypeis set toDecommissionandworkerGracefulShutdownreturns false (suppressing recovery DB initialization)decommission.forceExitTimeoutbefore callingdecommissionWorker()