[CELEBORN-2362] Fix Flaky CI/CD#3737
Conversation
141ae29 to
8197efa
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3737 +/- ##
============================================
- Coverage 66.91% 57.38% -9.53%
- Complexity 0 214 +214
============================================
Files 358 395 +37
Lines 21986 27822 +5836
Branches 1946 2712 +766
============================================
+ Hits 14710 15962 +1252
- Misses 6262 10706 +4444
- Partials 1014 1154 +140 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
9937f86 to
dfa6397
Compare
6855de2 to
35123c3
Compare
|
@afterincomparableyum, is it ready? This fix is helpful for CI running. |
Address several independent root causes of CI/CD flakiness:
- build/mvn: default to archive.apache.org instead of the closer.lua
mirror redirector, validate each download is a real tarball (closer.lua
intermittently returns an HTML mirror page with HTTP 200), and retry up
to 3 times.
- pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
down channels mid-write and flaking the Flink integration tests.
- MiniClusterFeature: cap randomly selected ports below the ephemeral
floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
that look free at selection time but fail to bind.
- RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
directory on every start attempt, since Ratis releases the directory
lock asynchronously and a retry would otherwise hit "directory is
already locked".
- spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
default) to cut the CPU/thread footprint of the serial single-JVM
suites that otherwise starves RPC/fetch handlers past the 240s timeout.
- LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
interval to 5s so the shuffle unregister runs within the eventually()
window with retry margin.
3d7bb21 to
3d8b984
Compare
|
@SteNicholas it is ready now, this should resolve a lot of the CI/CD flakes. There may be 1 failing every now and then, but this is an improvement from before. |
There was a problem hiding this comment.
Pull request overview
This PR targets CI/CD flakiness across the build toolchain, coverage instrumentation, and integration test stability (Spark/Flink) by reducing resource contention, eliminating known race conditions, and making startup/cleanup more deterministic.
Changes:
- Harden build tooling and coverage: make Maven download deterministic + validated/retried; exclude
io/netty/**from JaCoCo instrumentation to avoid Netty JFR class instrumentation failures. - Stabilize mini-cluster + integration tests: avoid ephemeral-port collisions, improve worker startup retry behavior, and enforce Spark/ShuffleClient cleanup between suites.
- Improve client/runtime correctness: adjust heartbeat scheduling, guard revive logic, and make the process-wide ShuffleClient singleton app-aware.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala | Bounds random port selection below ephemeral range; improves worker startup retry/cleanup behavior. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala | Reduces worker count; adds SparkSession + ShuffleClient cleanup between suites. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala | Reduces worker count used by the suite to lower contention. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/memory/MemorySparkTestBase.scala | Reduces worker count for memory-storage suites to reduce CPU/thread pressure. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | Adds per-test SparkSession + ShuffleClient reset to prevent cross-test interference. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala | Shortens shuffle expired-check interval to fit within eventually() window reliably. |
| tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala | Waits for async local flushing to complete before asserting on-disk file sizes. |
| pom.xml | Excludes Netty classes from JaCoCo agent instrumentation to prevent runtime channel failures. |
| master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java | Reconfigures Ratis storage dirs on retries to avoid async lock-release flakiness. |
| client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala | Delays first heartbeat by one interval to avoid firing before full registration. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Guards revive path against null old locations to avoid NPEs. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Makes static ShuffleClient singleton app-aware (rebuild on app id change, clear on reset). |
| build/mvn | Makes Maven download deterministic (archive), validates tarball bodies, and retries downloads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I addressed CoPilot comments @SteNicholas |
|
I’ll revert the CoPilot suggestions that broke CI in a few hours and ping when ready again |
62529b0 to
4283c1c
Compare
|
Ping @SteNicholas PR is ready for review again. |
SteNicholas
left a comment
There was a problem hiding this comment.
Code review — 6 findings (2 bugs, 2 cleanup, 2 minor).
Top issues:
- Resource leak — the new
appUniqueId-mismatch branch inShuffleClient.get()replaces_instancewithout callingshutdown()on the old one, leaking transport connections, thread pools, and RPC references. - NPE risk — the outer guard calls
appUniqueId.equals(…)which will NPE if the parameter is null.
Also noted (not in diff, no inline comment):
MasterClusterFeature.selectRandomPort()(line 53) andRatisMasterStatusSystemSuiteJ(line 134) still useselectRandomInt(1024, 65535)— the ephemeral-port collision fix applied toMiniClusterFeaturewas not propagated to these files.
See inline comments for details.
|
@afterincomparableyum, could you please take a look at above comments? |
…fleclient, adjust mvn build script, and also remove duplicate code in disk suite.
|
Ping @SteNicholas I've addressed comments. |
…nistically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure
bd8a3e4 to
e82023b
Compare
There was a problem hiding this comment.
@afterincomparableyum, thanks for udpates. I left some comments for updates.
| _instance.setExtension(extension); | ||
| _appUniqueId = appUniqueId; | ||
| initialized = true; | ||
| } else if (!Objects.equals(appUniqueId, _appUniqueId)) { |
There was a problem hiding this comment.
[blocker] get() can return another app's client — data race on the trailing return _instance.
Ordering the volatile writes (_instance before _appUniqueId) correctly stops the outer lock-free guard from handing back a stale instance. But the method still ends with return _instance outside the synchronized block. Between this thread releasing the lock and reaching that return, another thread with a different appUniqueId can re-enter this branch and overwrite _instance:
Thread A (app "A") Thread B (app "B")
_instance = newInstanceA
_appUniqueId = "A"
<exit synchronized>
<enter synchronized>
_instance = newInstanceB
return _instance // hands B's client to A
A then resolves celebornShuffleId against B's LifecycleManager → the ArrayIndexOutOfBoundsException / CommitMetadata CRC-mismatch / cross-app read this PR is trying to eliminate. Fix: return the instance you just built — capture it in a local and return that (or return from inside the lock) instead of re-reading the shared field.
Two lower-severity points on the same branch:
- Singleton ping-pong: with the outer guard now
|| !Objects.equals(appUniqueId, _appUniqueId), everyget()carrying a different id re-enters and swaps the static_instance. In a JVM with two concurrently-live apps (this test harness; also Flink session-cluster TaskManagers in prod), A's and B's tasks flip the shared singleton back and forth, so a holder can observe a client with the wrong/empty registration state. Single-app executors never reach this branch. - Orphan leak (intentional — flagging for sign-off): the replaced instance is deliberately not
shutdown()(per the comment, to avoidRejectedExecutionException), leaking oneRpcEnv+ thread pools per distinctappUniqueIdfor the JVM's life. Bounded and fine for single-app JVMs, but worth a reviewer explicitly ratifying.
| } | ||
| }, | ||
| 0, | ||
| appHeartbeatIntervalMs, |
There was a problem hiding this comment.
This is a production behavior change inside a "Fix Flaky CI/CD" PR, with no comment explaining it. The initial delay went 0 -> appHeartbeatIntervalMs, so the first app heartbeat — which also drives ReviseLostShuffles and app liveness on the master — now fires a full interval later for every real deployment, just to settle a test's timing. Prefer fixing this at the test layer (widen the test's wait/eventually window); if the delay really is needed in prod, please add a comment justifying it.
| Thread.currentThread().interrupt() | ||
| throw ie | ||
| } | ||
| worker = createWorker(workerConf) |
There was a problem hiding this comment.
Recreating the worker on every retry (var worker = createWorker(workerConf)) instead of re-initialize()-ing the same instance has two side effects:
- Leaked JVM shutdown hooks:
Worker's constructor unconditionally registers a JVM shutdown hook and nothing ever removes it, so each failed attempt permanently leaks a hook bound to a now-dead worker. In the shared serial spark-it/flink-it JVM these accumulate; at JVM exit they all fireaskSync(WorkerLost)against a dead master, each blocking on the RPC ask timeout (slow/noisy shutdown), and they keep the deadWorkerobjects reachable for the whole run. - A dead worker can be published: with
var workerreassigned, the registration-waiting thread mayworkerInfos.putthe first (torn-down) attempt before the replacement is assigned, leaving a dead worker in the map thatgetOneWorker()/shutdownMiniCluster()can later pick — NPE/teardown flakiness in the very retry path this PR targets.
Consider retrying initialize() on a single instance (as before), or removing the shutdown hook on teardown.
| throw ex | ||
| } | ||
| try { | ||
| TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong) |
There was a problem hiding this comment.
TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry)) on the bootstrap thread adds up to ~2+4+8 = 14s of pure sleep per flaky worker to suite setup (maxRetries=4). For a transient bind error a much smaller backoff suffices — consider capping it, e.g. min(2^n, 2) seconds.
| // ratis port collision) can leave the previous directory locked. Reusing the same | ||
| // directory on retry then fails with "directory is already locked"; allocating a clean | ||
| // directory each time avoids contending for a lock that has not been released yet. | ||
| configureServerConf(conf1, 1); |
There was a problem hiding this comment.
configureServerConf is already invoked by the callers when they build conf1/2/3 (e.g. init()), so on the first loop iteration it now runs twice per server on the same conf — the storage dir the caller just created is immediately overwritten and orphaned (an empty celeborn-ratis* temp dir, never cleaned), once per server on every resetRaftServer. The stated intent ("fresh dir only on retry") would be served by skipping this on the first attempt, e.g. guard on a retry counter.
| setupMiniClusterWithRandomPorts(workerConf = workerConfs, workerNum = 3) | ||
| } | ||
|
|
||
| override def afterAll(): Unit = { |
There was a problem hiding this comment.
This pre-existing afterAll calls only shutdownMiniCluster() — no super.afterAll() and no stopActiveSparkSessions(). The point of the new SparkTestBase.afterAll cleanup in this PR is to stop a leaked SparkSession/SparkContext (and run ShuffleClient.reset()) before the next suite runs in the shared serial JVM. Since this override isn't updated, memory-storage suites skip that cleanup and can still leave a stale LifecycleManager behind — exactly the flakiness the PR targets. Please call super.afterAll() (or stopActiveSparkSessions()) here.
| // trip the threshold, which could (and did) hang indefinitely when the runner had enough | ||
| // headroom that GC pauses never dominated runtime. | ||
| assert(!quake.heapDumped) | ||
| quake.checkAndDump(TimeUnit.SECONDS.toNanos(2), 0L) |
There was a problem hiding this comment.
Calling checkAndDump(...) directly fixes the hang, but drops all coverage of start()/stop() scheduling and the production run() path (jvmstat counter read + ticks->nanos conversion + the scheduler actually invoking run()). A regression there — bad delta, bad conversion, scheduler never firing — would no longer be caught by CI, and a worker stuck in GC thrash would silently never self-terminate. Consider keeping a lightweight test that exercises run() (or at least the scheduling wiring) alongside this deterministic checkAndDump assertion.
| } | ||
| def selectRandomPort(): Int = synchronized { | ||
| val port = Utils.selectRandomInt(1024, 65535) | ||
| val port = Utils.selectRandomInt(1024, 32768) |
There was a problem hiding this comment.
The ephemeral-port-floor fix lives only in MiniClusterFeature (the named maxSelectablePort constant + the rationale comment). This copy carries a bare 32768 literal, and RatisMasterStatusSystemSuiteJ carries yet another (32766). Three near-identical selectRandomPort blocks now diverge — the next person who tweaks the floor in one place will silently leave the others wrong and the BindException flakiness reappears in the master suites. Extract one shared helper/constant and call it from all three.
| // the physical file is grown asynchronously by the LocalFlusher. Right after the job finishes | ||
| // the flusher may not have drained the last buffers yet, so the on-disk length can lag (briefly | ||
| // even 0). Wait for the flush to catch up before asserting equality instead of reading mid-flush. | ||
| eventually(timeout(30.seconds), interval(500.milliseconds)) { |
There was a problem hiding this comment.
Minor: each 500ms poll re-walks every worker's full workingDirWriters map and issues a fresh File.length() syscall per file for up to 30s, so a single lagging file causes every already-matching file to be re-stat'd repeatedly. Could poll only the not-yet-matching writers, or drain/await the flush once before a single assertion pass.
What changes were proposed in this pull request?
Address several independent root causes of CI/CD flakiness, spanning the
build tooling, JaCoCo instrumentation, the test mini-cluster, and the
process-wide shuffle client.
Build & coverage:
build/mvn: default to archive.apache.org instead of the closer.lua
mirror redirector, validate each download is a real tarball (closer.lua
intermittently returns an HTML mirror page with HTTP 200), and retry up
to 3 times.
pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
down channels mid-write and flaking the Flink integration tests.
Client correctness:
ShuffleClient: make the process-wide singleton app-aware. Track the
appUniqueId of the live instance and rebuild it when a later app/suite
requests a different id (and clear it in reset()), so a straggler from a
previous app can no longer bind to the wrong LifecycleManager through the
shared static client.
ShuffleClientImpl: guard the revive path against a null oldLoc before
removing it from pushExcludedWorkers, avoiding an NPE when no previous
location is mapped.
ApplicationHeartbeater: start the heartbeat timer after one interval
instead of at delay 0, so the first heartbeat does not fire before the
application is fully registered.
Mini-cluster & test harness:
MiniClusterFeature: cap randomly selected ports below the ephemeral
floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
that look free at selection time but fail to bind. Also rework worker
startup: recreate the worker on each retry, handle InterruptedException
explicitly (stop + interrupt + rethrow), and back off exponentially
between attempts so a transient port collision retries cleanly.
RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
directory on every start attempt, since Ratis releases the directory
lock asynchronously and a retry would otherwise hit "directory is
already locked".
Spark/Flink integration suites:
SparkTestBase / CelebornHashCheckDiskSuite: stop any active
SparkSession and reset the static ShuffleClient between suites/tests, so
a leaked SparkContext can no longer keep an old LifecycleManager alive
and corrupt a later suite's shuffle 0.
spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
default) to cut the CPU/thread footprint of the serial single-JVM
suites that otherwise starves RPC/fetch handlers past the 240s timeout.
LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
interval to 5s so the shuffle unregister runs within the eventually()
window with retry margin.
HybridShuffleWordCountTest: wait (eventually) for the async LocalFlusher
to drain before asserting on-disk file length equals the logical
length, instead of reading the file mid-flush.
Also, I fix flaky JVMQuakeSuite hang by driving the GC deficit bucket deterministically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure
Why are the changes needed?
CI/CD is always failing. With this, CI/CD rarely fails.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
CI/CD