Skip to content

[CELEBORN-2362] Fix Flaky CI/CD#3737

Open
afterincomparableyum wants to merge 5 commits into
apache:mainfrom
afterincomparableyum:flaky-cicd
Open

[CELEBORN-2362] Fix Flaky CI/CD#3737
afterincomparableyum wants to merge 5 commits into
apache:mainfrom
afterincomparableyum:flaky-cicd

Conversation

@afterincomparableyum

@afterincomparableyum afterincomparableyum commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Address several independent root causes of CI/CD flakiness, spanning the
build tooling, JaCoCo instrumentation, the test mini-cluster, and the
process-wide shuffle client.

Build & coverage:

  • build/mvn: default to archive.apache.org instead of the closer.lua
    mirror redirector, validate each download is a real tarball (closer.lua
    intermittently returns an HTML mirror page with HTTP 200), and retry up
    to 3 times.

  • pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
    agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
    down channels mid-write and flaking the Flink integration tests.

Client correctness:

  • ShuffleClient: make the process-wide singleton app-aware. Track the
    appUniqueId of the live instance and rebuild it when a later app/suite
    requests a different id (and clear it in reset()), so a straggler from a
    previous app can no longer bind to the wrong LifecycleManager through the
    shared static client.

  • ShuffleClientImpl: guard the revive path against a null oldLoc before
    removing it from pushExcludedWorkers, avoiding an NPE when no previous
    location is mapped.

  • ApplicationHeartbeater: start the heartbeat timer after one interval
    instead of at delay 0, so the first heartbeat does not fire before the
    application is fully registered.

Mini-cluster & test harness:

  • MiniClusterFeature: cap randomly selected ports below the ephemeral
    floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
    that look free at selection time but fail to bind. Also rework worker
    startup: recreate the worker on each retry, handle InterruptedException
    explicitly (stop + interrupt + rethrow), and back off exponentially
    between attempts so a transient port collision retries cleanly.

  • RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
    directory on every start attempt, since Ratis releases the directory
    lock asynchronously and a retry would otherwise hit "directory is
    already locked".

Spark/Flink integration suites:

  • SparkTestBase / CelebornHashCheckDiskSuite: stop any active
    SparkSession and reset the static ShuffleClient between suites/tests, so
    a leaked SparkContext can no longer keep an old LifecycleManager alive
    and corrupt a later suite's shuffle 0.

  • spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
    default) to cut the CPU/thread footprint of the serial single-JVM
    suites that otherwise starves RPC/fetch handlers past the 240s timeout.

  • LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
    interval to 5s so the shuffle unregister runs within the eventually()
    window with retry margin.

  • HybridShuffleWordCountTest: wait (eventually) for the async LocalFlusher
    to drain before asserting on-disk file length equals the logical
    length, instead of reading the file mid-flush.

Also, I fix flaky JVMQuakeSuite hang by driving the GC deficit bucket deterministically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure

Why are the changes needed?

CI/CD is always failing. With this, CI/CD rarely fails.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

CI/CD

@codecov

codecov Bot commented Jun 14, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.38%. Comparing base (b4cb5a0) to head (b4d6dc4).
⚠️ Report is 69 commits behind head on main.

Files with missing lines Patch % Lines
...java/org/apache/celeborn/client/ShuffleClient.java 0.00% 11 Missing ⚠️
.../org/apache/celeborn/client/ShuffleClientImpl.java 0.00% 2 Missing ⚠️
...pache/celeborn/client/ApplicationHeartbeater.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3737      +/-   ##
============================================
- Coverage     66.91%   57.38%   -9.53%     
- Complexity        0      214     +214     
============================================
  Files           358      395      +37     
  Lines         21986    27822    +5836     
  Branches       1946     2712     +766     
============================================
+ Hits          14710    15962    +1252     
- Misses         6262    10706    +4444     
- Partials       1014     1154     +140     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@afterincomparableyum afterincomparableyum force-pushed the flaky-cicd branch 4 times, most recently from 9937f86 to dfa6397 Compare June 15, 2026 04:45
@afterincomparableyum afterincomparableyum changed the title [CELEBORN-XXXX][WIP] fix flaky CI/CD [CELEBORN-2362] Fix Flaky CI/CD Jun 15, 2026
@afterincomparableyum afterincomparableyum force-pushed the flaky-cicd branch 5 times, most recently from 6855de2 to 35123c3 Compare June 15, 2026 13:39
@SteNicholas

Copy link
Copy Markdown
Member

@afterincomparableyum, is it ready? This fix is helpful for CI running.

  Address several independent root causes of CI/CD flakiness:

  - build/mvn: default to archive.apache.org instead of the closer.lua
    mirror redirector, validate each download is a real tarball (closer.lua
    intermittently returns an HTML mirror page with HTTP 200), and retry up
    to 3 times.

  - pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
    agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
    down channels mid-write and flaking the Flink integration tests.

  - MiniClusterFeature: cap randomly selected ports below the ephemeral
    floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
    that look free at selection time but fail to bind.

  - RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
    directory on every start attempt, since Ratis releases the directory
    lock asynchronously and a retry would otherwise hit "directory is
    already locked".

  - spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
    default) to cut the CPU/thread footprint of the serial single-JVM
    suites that otherwise starves RPC/fetch handlers past the 240s timeout.

  - LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
    interval to 5s so the shuffle unregister runs within the eventually()
    window with retry margin.
@afterincomparableyum afterincomparableyum marked this pull request as ready for review June 16, 2026 13:47
@afterincomparableyum

Copy link
Copy Markdown
Contributor Author

@SteNicholas it is ready now, this should resolve a lot of the CI/CD flakes. There may be 1 failing every now and then, but this is an improvement from before.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR targets CI/CD flakiness across the build toolchain, coverage instrumentation, and integration test stability (Spark/Flink) by reducing resource contention, eliminating known race conditions, and making startup/cleanup more deterministic.

Changes:

  • Harden build tooling and coverage: make Maven download deterministic + validated/retried; exclude io/netty/** from JaCoCo instrumentation to avoid Netty JFR class instrumentation failures.
  • Stabilize mini-cluster + integration tests: avoid ephemeral-port collisions, improve worker startup retry behavior, and enforce Spark/ShuffleClient cleanup between suites.
  • Improve client/runtime correctness: adjust heartbeat scheduling, guard revive logic, and make the process-wide ShuffleClient singleton app-aware.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala Bounds random port selection below ephemeral range; improves worker startup retry/cleanup behavior.
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala Reduces worker count; adds SparkSession + ShuffleClient cleanup between suites.
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala Reduces worker count used by the suite to lower contention.
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/memory/MemorySparkTestBase.scala Reduces worker count for memory-storage suites to reduce CPU/thread pressure.
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala Adds per-test SparkSession + ShuffleClient reset to prevent cross-test interference.
tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala Shortens shuffle expired-check interval to fit within eventually() window reliably.
tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala Waits for async local flushing to complete before asserting on-disk file sizes.
pom.xml Excludes Netty classes from JaCoCo agent instrumentation to prevent runtime channel failures.
master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java Reconfigures Ratis storage dirs on retries to avoid async lock-release flakiness.
client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala Delays first heartbeat by one interval to avoid firing before full registration.
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java Guards revive path against null old locations to avoid NPEs.
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Makes static ShuffleClient singleton app-aware (rebuild on app id change, clear on reset).
build/mvn Makes Maven download deterministic (archive), validates tarball bodies, and retries downloads.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Outdated
Comment thread worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala Outdated
Comment thread build/mvn Outdated
Comment thread build/mvn
@afterincomparableyum

Copy link
Copy Markdown
Contributor Author

I addressed CoPilot comments @SteNicholas

@afterincomparableyum

Copy link
Copy Markdown
Contributor Author

I’ll revert the CoPilot suggestions that broke CI in a few hours and ping when ready again

@afterincomparableyum

Copy link
Copy Markdown
Contributor Author

Ping @SteNicholas PR is ready for review again.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review — 6 findings (2 bugs, 2 cleanup, 2 minor).

Top issues:

  1. Resource leak — the new appUniqueId-mismatch branch in ShuffleClient.get() replaces _instance without calling shutdown() on the old one, leaking transport connections, thread pools, and RPC references.
  2. NPE risk — the outer guard calls appUniqueId.equals(…) which will NPE if the parameter is null.

Also noted (not in diff, no inline comment):

  • MasterClusterFeature.selectRandomPort() (line 53) and RatisMasterStatusSystemSuiteJ (line 134) still use selectRandomInt(1024, 65535) — the ephemeral-port collision fix applied to MiniClusterFeature was not propagated to these files.

See inline comments for details.

Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Outdated
Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Outdated
Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Comment thread build/mvn
@SteNicholas

Copy link
Copy Markdown
Member

@afterincomparableyum, could you please take a look at above comments?

…fleclient, adjust mvn build script, and also remove duplicate code in disk suite.
@afterincomparableyum

Copy link
Copy Markdown
Contributor Author

Ping @SteNicholas I've addressed comments.

…nistically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afterincomparableyum, thanks for udpates. I left some comments for updates.

_instance.setExtension(extension);
_appUniqueId = appUniqueId;
initialized = true;
} else if (!Objects.equals(appUniqueId, _appUniqueId)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[blocker] get() can return another app's client — data race on the trailing return _instance.

Ordering the volatile writes (_instance before _appUniqueId) correctly stops the outer lock-free guard from handing back a stale instance. But the method still ends with return _instance outside the synchronized block. Between this thread releasing the lock and reaching that return, another thread with a different appUniqueId can re-enter this branch and overwrite _instance:

Thread A (app "A")              Thread B (app "B")
  _instance = newInstanceA
  _appUniqueId = "A"
  <exit synchronized>
                                <enter synchronized>
                                _instance = newInstanceB
  return _instance  // hands B's client to A

A then resolves celebornShuffleId against B's LifecycleManager → the ArrayIndexOutOfBoundsException / CommitMetadata CRC-mismatch / cross-app read this PR is trying to eliminate. Fix: return the instance you just built — capture it in a local and return that (or return from inside the lock) instead of re-reading the shared field.

Two lower-severity points on the same branch:

  • Singleton ping-pong: with the outer guard now || !Objects.equals(appUniqueId, _appUniqueId), every get() carrying a different id re-enters and swaps the static _instance. In a JVM with two concurrently-live apps (this test harness; also Flink session-cluster TaskManagers in prod), A's and B's tasks flip the shared singleton back and forth, so a holder can observe a client with the wrong/empty registration state. Single-app executors never reach this branch.
  • Orphan leak (intentional — flagging for sign-off): the replaced instance is deliberately not shutdown() (per the comment, to avoid RejectedExecutionException), leaking one RpcEnv + thread pools per distinct appUniqueId for the JVM's life. Bounded and fine for single-app JVMs, but worth a reviewer explicitly ratifying.

}
},
0,
appHeartbeatIntervalMs,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a production behavior change inside a "Fix Flaky CI/CD" PR, with no comment explaining it. The initial delay went 0 -> appHeartbeatIntervalMs, so the first app heartbeat — which also drives ReviseLostShuffles and app liveness on the master — now fires a full interval later for every real deployment, just to settle a test's timing. Prefer fixing this at the test layer (widen the test's wait/eventually window); if the delay really is needed in prod, please add a comment justifying it.

Thread.currentThread().interrupt()
throw ie
}
worker = createWorker(workerConf)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recreating the worker on every retry (var worker = createWorker(workerConf)) instead of re-initialize()-ing the same instance has two side effects:

  • Leaked JVM shutdown hooks: Worker's constructor unconditionally registers a JVM shutdown hook and nothing ever removes it, so each failed attempt permanently leaks a hook bound to a now-dead worker. In the shared serial spark-it/flink-it JVM these accumulate; at JVM exit they all fire askSync(WorkerLost) against a dead master, each blocking on the RPC ask timeout (slow/noisy shutdown), and they keep the dead Worker objects reachable for the whole run.
  • A dead worker can be published: with var worker reassigned, the registration-waiting thread may workerInfos.put the first (torn-down) attempt before the replacement is assigned, leaving a dead worker in the map that getOneWorker() / shutdownMiniCluster() can later pick — NPE/teardown flakiness in the very retry path this PR targets.

Consider retrying initialize() on a single instance (as before), or removing the shutdown hook on teardown.

throw ex
}
try {
TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry)) on the bootstrap thread adds up to ~2+4+8 = 14s of pure sleep per flaky worker to suite setup (maxRetries=4). For a transient bind error a much smaller backoff suffices — consider capping it, e.g. min(2^n, 2) seconds.

// ratis port collision) can leave the previous directory locked. Reusing the same
// directory on retry then fails with "directory is already locked"; allocating a clean
// directory each time avoids contending for a lock that has not been released yet.
configureServerConf(conf1, 1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configureServerConf is already invoked by the callers when they build conf1/2/3 (e.g. init()), so on the first loop iteration it now runs twice per server on the same conf — the storage dir the caller just created is immediately overwritten and orphaned (an empty celeborn-ratis* temp dir, never cleaned), once per server on every resetRaftServer. The stated intent ("fresh dir only on retry") would be served by skipping this on the first attempt, e.g. guard on a retry counter.

setupMiniClusterWithRandomPorts(workerConf = workerConfs, workerNum = 3)
}

override def afterAll(): Unit = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pre-existing afterAll calls only shutdownMiniCluster() — no super.afterAll() and no stopActiveSparkSessions(). The point of the new SparkTestBase.afterAll cleanup in this PR is to stop a leaked SparkSession/SparkContext (and run ShuffleClient.reset()) before the next suite runs in the shared serial JVM. Since this override isn't updated, memory-storage suites skip that cleanup and can still leave a stale LifecycleManager behind — exactly the flakiness the PR targets. Please call super.afterAll() (or stopActiveSparkSessions()) here.

// trip the threshold, which could (and did) hang indefinitely when the runner had enough
// headroom that GC pauses never dominated runtime.
assert(!quake.heapDumped)
quake.checkAndDump(TimeUnit.SECONDS.toNanos(2), 0L)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling checkAndDump(...) directly fixes the hang, but drops all coverage of start()/stop() scheduling and the production run() path (jvmstat counter read + ticks->nanos conversion + the scheduler actually invoking run()). A regression there — bad delta, bad conversion, scheduler never firing — would no longer be caught by CI, and a worker stuck in GC thrash would silently never self-terminate. Consider keeping a lightweight test that exercises run() (or at least the scheduling wiring) alongside this deterministic checkAndDump assertion.

}
def selectRandomPort(): Int = synchronized {
val port = Utils.selectRandomInt(1024, 65535)
val port = Utils.selectRandomInt(1024, 32768)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ephemeral-port-floor fix lives only in MiniClusterFeature (the named maxSelectablePort constant + the rationale comment). This copy carries a bare 32768 literal, and RatisMasterStatusSystemSuiteJ carries yet another (32766). Three near-identical selectRandomPort blocks now diverge — the next person who tweaks the floor in one place will silently leave the others wrong and the BindException flakiness reappears in the master suites. Extract one shared helper/constant and call it from all three.

// the physical file is grown asynchronously by the LocalFlusher. Right after the job finishes
// the flusher may not have drained the last buffers yet, so the on-disk length can lag (briefly
// even 0). Wait for the flush to catch up before asserting equality instead of reading mid-flush.
eventually(timeout(30.seconds), interval(500.milliseconds)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: each 500ms poll re-walks every worker's full workingDirWriters map and issues a fresh File.length() syscall per file for up to 30s, so a single lagging file causes every already-matching file to be re-stat'd repeatedly. Could poll only the not-yet-matching writers, or drain/await the flush once before a single assertion pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants