From 2535f1c8e6031e7d1261eab30d1ddcb5a702dbf6 Mon Sep 17 00:00:00 2001 From: praagyajain Date: Sun, 14 Jun 2026 14:49:23 +0530 Subject: [PATCH 1/8] feat(dedup): TCP client transport for k8s (CoverageTcpClient) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In k8s the dedup coverage collector runs in the k8s-proxy Deployment pod while the app JVM is in a separate pod, so the shared-/tmp unix sockets can't reach it. Add a TCP transport selected by KEPLOY_COVERAGE_ENDPOINT (host:port): the SDK dials the collector and keeps one bidirectional connection for the whole replay. Wire protocol (newline-delimited), matching the k8s-proxy collector: collector -> SDK : "START " | "END " SDK -> collector : "ACK" (after START reset) "COV " + "ACK" (after END dump) - CoverageTcpClient mirrors CommandServer's dispatch but inverts roles (SDK is the client); reuses CoverageCollector reset/capture unchanged. COV precedes ACK so the collector records the payload before the ACK releases the caller. - Connect loop retries (collector listens only once replay begins). No read timeout on the long-lived idle-between-tests connection. - Unix transport stays the default for local/docker (unchanged); the worker field is generalized to Closeable so stop() is transport-agnostic. Validated end-to-end: real JVM (this SDK as -javaagent, TCP client) against the k8s-proxy DedupCoverage TCP server — coverage flows correctly; the score>=90 pair dedupes (identical line sets), the score=40 case differs. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../io/keploy/dedup/KeployDedupAgent.java | 202 +++++++++++++++++- 1 file changed, 194 insertions(+), 8 deletions(-) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index 30cd22f..5b7cbf6 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -68,7 +68,9 @@ public final class KeployDedupAgent { private static final AtomicBoolean STARTED = new AtomicBoolean(false); private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); - private static volatile CommandServer commandServer; + // The active transport worker: a CommandServer (unix, local/docker) or a + // CoverageTcpClient (TCP, k8s). Both are Closeable so stop() is transport-agnostic. + private static volatile Closeable coverageWorker; private KeployDedupAgent() { } @@ -109,10 +111,43 @@ public static boolean start() { CoverageCollector collector = new CoverageCollector( new JacocoClient(resolveHost(), resolvePort()), new CoverageIndex()); - CommandServer server = new CommandServer(collector, new CoveragePublisher(new File(DATA_SOCKET_PATH))); - Thread thread = new Thread(server, "keploy-java-dedup-control"); + + Runnable worker; + String threadName; + String endpoint = resolveEndpoint(); + if (endpoint != null) { + // k8s: the collector lives in a different pod, so there is no shared + // /tmp. Dial the collector's TCP endpoint and run the same protocol. + int idx = endpoint.lastIndexOf(':'); + if (idx <= 0 || idx == endpoint.length() - 1) { + STARTED.set(false); + log(Level.SEVERE, "Invalid KEPLOY_COVERAGE_ENDPOINT '" + endpoint + "', expected host:port", null); + return false; + } + String host = endpoint.substring(0, idx); + int port; + try { + port = Integer.parseInt(endpoint.substring(idx + 1).trim()); + } catch (NumberFormatException e) { + STARTED.set(false); + log(Level.SEVERE, "Invalid port in KEPLOY_COVERAGE_ENDPOINT '" + endpoint + "'", e); + return false; + } + CoverageTcpClient client = new CoverageTcpClient(collector, host, port); + worker = client; + coverageWorker = client; + threadName = "keploy-java-dedup-tcp"; + } else { + // local/docker: SDK owns the unix control socket and pushes coverage + // back over the unix data socket on a pod-shared /tmp. + CommandServer server = new CommandServer(collector, new CoveragePublisher(new File(DATA_SOCKET_PATH))); + worker = server; + coverageWorker = server; + threadName = "keploy-java-dedup-control"; + } + + Thread thread = new Thread(worker, threadName); thread.setDaemon(true); - commandServer = server; thread.start(); registerShutdownHook(); return true; @@ -131,11 +166,15 @@ public static boolean isStarted() { * Stops the background control socket listener. */ public static void stop() { - CommandServer server = commandServer; - if (server != null) { - server.close(); + Closeable worker = coverageWorker; + if (worker != null) { + try { + worker.close(); + } catch (IOException e) { + log(Level.FINE, "Failed to close Java dedup coverage worker", e); + } } - commandServer = null; + coverageWorker = null; STARTED.set(false); } @@ -190,6 +229,16 @@ private static int resolvePort() { } } + /** + * Returns the collector's TCP endpoint ("host:port") for k8s mode, or {@code null} + * to use the unix-socket transport (local/docker). The collector advertises this + * to the SDK via KEPLOY_COVERAGE_ENDPOINT when app and collector are in different pods. + */ + private static String resolveEndpoint() { + String value = envOrProperty("KEPLOY_COVERAGE_ENDPOINT", "keploy.coverage.endpoint", ""); + return value.trim().isEmpty() ? null : value.trim(); + } + private static String envOrProperty(String envKey, String propertyKey, String defaultValue) { String value = System.getenv(envKey); if (value == null || value.trim().isEmpty()) { @@ -361,6 +410,143 @@ public void close() { } } + /** + * TCP transport (k8s): the SDK dials the collector and keeps one bidirectional + * connection open for the whole replay. Mirrors {@link CommandServer}'s dispatch + * but inverts the roles — here the SDK is the client. Wire protocol: + *
+     *   collector -> SDK : "START <id>" | "END <id>"
+     *   SDK -> collector : "ACK"                        (after START reset)
+     *                      "COV <compact-json>" + "ACK" (after END dump)
+     * 
+ * The collector starts listening only when replay begins, so the connect loop + * retries until it is reachable. + */ + private static final class CoverageTcpClient implements Runnable, Closeable { + + private static final long RECONNECT_DELAY_MILLIS = 1000; + + private final CoverageCollector collector; + private final String host; + private final int port; + private final AtomicBoolean running = new AtomicBoolean(true); + private final Object testCaseLock = new Object(); + private volatile Socket socket; + private String activeTestId = ""; + + CoverageTcpClient(CoverageCollector collector, String host, int port) { + this.collector = collector; + this.host = host; + this.port = port; + } + + @Override + public void run() { + while (running.get()) { + try { + connectAndServe(); + } catch (IOException e) { + if (running.get()) { + log(Level.FINE, "Java dedup TCP connection to " + host + ":" + port + + " unavailable, retrying", e); + } + } + if (running.get()) { + try { + Thread.sleep(RECONNECT_DELAY_MILLIS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + private void connectAndServe() throws IOException { + Socket open = new Socket(); + // Bounded connect, but NO read timeout: the connection is long-lived and + // idles between tests waiting for the next START/END command. + open.connect(new InetSocketAddress(InetAddress.getByName(host), port), SOCKET_TIMEOUT_MILLIS); + socket = open; + try (Socket active = open; + BufferedReader reader = new BufferedReader( + new InputStreamReader(active.getInputStream(), StandardCharsets.UTF_8))) { + OutputStream out = active.getOutputStream(); + String line; + while (running.get() && (line = reader.readLine()) != null) { + String trimmed = line.trim(); + if (trimmed.isEmpty()) { + continue; + } + CoverageCommand command = CoverageCommand.parse(trimmed); + if (command == null) { + continue; + } + dispatch(command, out); + } + } finally { + socket = null; + } + } + + private void dispatch(CoverageCommand command, OutputStream out) throws IOException { + synchronized (testCaseLock) { + if (command.action == CommandAction.START) { + activeTestId = command.testId; + collector.reset(); + writeLine(out, "ACK"); + return; + } + + if (command.action == CommandAction.END) { + if (!command.testId.equals(activeTestId)) { + log(Level.SEVERE, + "Ignoring mismatched END command. expected=" + activeTestId + ", actual=" + + command.testId, + null); + writeLine(out, "ACK"); + return; + } + + try { + Map> executedLinesByFile = collector.capture(); + if (executedLinesByFile.isEmpty()) { + log(Level.FINE, "No Java coverage lines collected for " + command.testId, null); + } else { + // COV must precede ACK: the collector reads lines sequentially, + // so the payload is recorded before the ACK releases the caller. + writeLine(out, "COV " + GSON.toJson( + new DedupPayload(command.testId, executedLinesByFile))); + } + } catch (Exception e) { + log(Level.SEVERE, "Failed to collect Java coverage for " + command.testId, e); + } finally { + activeTestId = ""; + writeLine(out, "ACK"); + } + } + } + } + + private void writeLine(OutputStream out, String message) throws IOException { + out.write((message + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + @Override + public void close() { + running.set(false); + Socket open = socket; + if (open != null) { + try { + open.close(); + } catch (IOException e) { + log(Level.FINE, "Failed to close Java dedup TCP socket", e); + } + } + } + } + private enum CommandAction { START, END From cf69aced82951d7219b10f7368edba00c4c9e18e Mon Sep 17 00:00:00 2001 From: praagyajain Date: Mon, 22 Jun 2026 02:22:02 +0530 Subject: [PATCH 2/8] fix(dedup): warm up app classes before first test for stable coverage A fresh JVM runs each class's static initializer () on first use. JaCoCo charged those one-time lines to whichever test ran first, so a test could look different from its true duplicates by luck of timing -> the duplicate set flip-flopped run to run (e.g. 2 vs 4). On the first START command (app fully started), eagerly Class.forName(..., initialize=true) every indexed application class so all lines run once, then the normal reset clears them. Every test window then captures only the lines its own request executes -> deterministic dedup. Best-effort per class; disable via KEPLOY_JAVA_DEDUP_WARMUP_DISABLED=true. Go is unaffected (AOT, init() runs once at startup before the first reset). Co-Authored-By: Claude Opus 4.8 --- .../io/keploy/dedup/KeployDedupAgent.java | 80 ++++++++++++++++++- 1 file changed, 78 insertions(+), 2 deletions(-) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index 5b7cbf6..0bf6271 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -137,6 +137,7 @@ public static boolean start() { worker = client; coverageWorker = client; threadName = "keploy-java-dedup-tcp"; + log(Level.INFO, "Keploy dedup: TCP transport enabled, will dial collector at " + host + ":" + port, null); } else { // local/docker: SDK owns the unix control socket and pushes coverage // back over the unix data socket on a pod-shared /tmp. @@ -355,6 +356,9 @@ private void dispatch(CoverageCommand command, OutputStream outputStream) { synchronized (testCaseLock) { if (command.action == CommandAction.START) { activeTestId = command.testId; + // Warm up app classes once before the first measured window so + // one-time lines aren't charged to the first test. + collector.warmup(); collector.reset(); writeAck(outputStream); return; @@ -447,8 +451,8 @@ public void run() { connectAndServe(); } catch (IOException e) { if (running.get()) { - log(Level.FINE, "Java dedup TCP connection to " + host + ":" + port - + " unavailable, retrying", e); + log(Level.INFO, "Keploy dedup: TCP connect to " + host + ":" + port + + " failed (" + e.getClass().getSimpleName() + ": " + e.getMessage() + "), retrying", null); } } if (running.get()) { @@ -468,6 +472,7 @@ private void connectAndServe() throws IOException { // idles between tests waiting for the next START/END command. open.connect(new InetSocketAddress(InetAddress.getByName(host), port), SOCKET_TIMEOUT_MILLIS); socket = open; + log(Level.INFO, "Keploy dedup: connected to collector at " + host + ":" + port, null); try (Socket active = open; BufferedReader reader = new BufferedReader( new InputStreamReader(active.getInputStream(), StandardCharsets.UTF_8))) { @@ -493,6 +498,9 @@ private void dispatch(CoverageCommand command, OutputStream out) throws IOExcept synchronized (testCaseLock) { if (command.action == CommandAction.START) { activeTestId = command.testId; + // Warm up app classes once before the first measured window so + // one-time lines aren't charged to the first test. + collector.warmup(); collector.reset(); writeLine(out, "ACK"); return; @@ -585,6 +593,7 @@ private static final class CoverageCollector { private final JacocoClient jacocoClient; private final CoverageIndex coverageIndex; + private final AtomicBoolean warmed = new AtomicBoolean(false); private CoverageCollector(JacocoClient jacocoClient, CoverageIndex coverageIndex) { this.jacocoClient = jacocoClient; @@ -599,6 +608,73 @@ private void reset() { } } + /** + * Eagerly initialize every indexed application class so their static + * initializers (<clinit>) run ONCE here, before the first test's + * coverage window. The very first request to a fresh JVM otherwise pays + * the one-time class-init cost, and JaCoCo charges those <clinit> + * lines to whichever test ran first — making the duplicate set + * non-deterministic run-to-run. Running them now (then letting the START + * reset clear the counters) means every test sees only the lines its own + * request executes. Called once, on the first START, when the app is + * fully started. Best-effort: per-class failures are ignored (the class + * just falls back to lazy init). Disable with + * KEPLOY_JAVA_DEDUP_WARMUP_DISABLED=true if a static initializer has + * harmful side effects. + */ + private void warmup() { + if (isWarmupDisabled() || !warmed.compareAndSet(false, true)) { + return; + } + ClassLoader[] loaders = warmupLoaders(); + int initialized = 0; + int failed = 0; + for (ClassEntry entry : coverageIndex.entries()) { + if (initializeClass(entry.className.replace('/', '.'), loaders)) { + initialized++; + } else { + failed++; + } + } + log(Level.INFO, "Keploy dedup: warmed up application classes (initialized=" + + initialized + ", skipped=" + failed + ")", null); + } + + private boolean isWarmupDisabled() { + return isTruthy(envOrProperty("KEPLOY_JAVA_DEDUP_WARMUP_DISABLED", + "keploy.java.dedup.warmup.disabled", "")); + } + + private ClassLoader[] warmupLoaders() { + List loaders = new ArrayList<>(3); + ClassLoader ctx = Thread.currentThread().getContextClassLoader(); + if (ctx != null) { + loaders.add(ctx); + } + ClassLoader sys = ClassLoader.getSystemClassLoader(); + if (sys != null && !loaders.contains(sys)) { + loaders.add(sys); + } + ClassLoader own = KeployDedupAgent.class.getClassLoader(); + if (own != null && !loaders.contains(own)) { + loaders.add(own); + } + return loaders.toArray(new ClassLoader[0]); + } + + private boolean initializeClass(String binaryName, ClassLoader[] loaders) { + for (ClassLoader loader : loaders) { + try { + Class.forName(binaryName, true, loader); + return true; + } catch (Throwable ignored) { + // Try the next loader; a class that no loader can initialize + // (or whose throws) is simply left to lazy init. + } + } + return false; + } + private Map> capture() throws IOException { byte[] dump = jacocoClient.dump(true, true); if (dump.length == 0) { From 2dd6aa064ea58acc3e24c4f13f3528b51c848ca6 Mon Sep 17 00:00:00 2001 From: praagyajain Date: Mon, 29 Jun 2026 14:52:43 +0530 Subject: [PATCH 3/8] feat(dedup): branch coverage via per-class JaCoCo probe fingerprints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fingerprint each test by the set of executed JaCoCo probes per class ({className -> [probeIdx]}) instead of executed source lines. Each branch is instrumented as a distinct probe, so the probe set distinguishes which branch a test took (true vs false) on a shared line — line status and even branch counts report identically for both paths. The probe set subsumes line coverage and uses canonical VM class-name keys (no path normalization). Only capture() changes; the wire payload, collector, store, and enterprise compute stay generic over map[string][]int. Removed the now-dead line-decode helpers (Analyzer/CoverageBuilder, executedLines, resolveSourcePath). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../io/keploy/dedup/KeployDedupAgent.java | 129 +++++------------- 1 file changed, 33 insertions(+), 96 deletions(-) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index 0bf6271..b273155 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -1,10 +1,6 @@ package io.keploy.dedup; import com.google.gson.Gson; -import org.jacoco.core.analysis.Analyzer; -import org.jacoco.core.analysis.CoverageBuilder; -import org.jacoco.core.analysis.IClassCoverage; -import org.jacoco.core.analysis.ICounter; import org.jacoco.core.data.ExecutionData; import org.jacoco.core.data.ExecutionDataStore; import org.jacoco.core.data.ExecutionDataWriter; @@ -687,73 +683,59 @@ private Map> capture() throws IOException { ExecFileLoader loader = new ExecFileLoader(); loader.load(new ByteArrayInputStream(dump)); ExecutionDataStore executionDataStore = loader.getExecutionDataStore(); - Set hitClasses = hitClassNames(executionDataStore); - if (hitClasses.isEmpty()) { + + // BRANCH coverage: fingerprint by the set of executed JaCoCo PROBES per + // class, NOT just executed lines. Each branch is instrumented as a + // distinct probe, so the executed-probe set distinguishes WHICH branch a + // test took (true vs false) — line status, and even branch counts, report + // identically for the true-path and false-path test. The probe set also + // subsumes line coverage (lines map to probes). Probe indices are stable + // for a given class bytecode, so the set is comparable across the run. + // Keyed by VM class name (canonical "com/foo/Bar"; no file-path + // normalization needed). CoverageIndex is used only to restrict to the + // app's own classes so JDK/library probes don't pollute the fingerprint. + Set appClasses = indexedClassNames(); + if (appClasses.isEmpty()) { if (diagnosticsEnabled()) { - diagnostic("execution data had no hit classes"); + diagnostic("coverage index has no app classes"); } return Collections.emptyMap(); } - List indexedEntries = coverageIndex.entries(); - if (diagnosticsEnabled()) { - diagnostic("hitClasses=" + hitClasses.size() - + ", indexedEntries=" + indexedEntries.size() - + ", sampleHits=" + summarizeStrings(hitClasses, 5) - + ", sampleEntries=" + summarizeClassEntries(indexedEntries, 5)); - } - - CoverageBuilder coverageBuilder = new CoverageBuilder(); - Analyzer analyzer = new Analyzer(executionDataStore, coverageBuilder); - for (ClassEntry classEntry : indexedEntries) { - if (!hitClasses.contains(classEntry.className)) { - continue; - } - try { - analyzer.analyzeClass(classEntry.bytes, classEntry.location); - } catch (IOException | RuntimeException e) { - log(Level.FINE, "Skipping unreadable Java class " + classEntry.location, e); - } - } - - if (diagnosticsEnabled()) { - diagnostic("analyzedClasses=" + coverageBuilder.getClasses().size()); - } - Map> raw = new LinkedHashMap<>(); - for (IClassCoverage classCoverage : coverageBuilder.getClasses()) { - if (!classCoverage.containsCode()) { + for (ExecutionData executionData : executionDataStore.getContents()) { + if (!executionData.hasHits()) { continue; } - - List executedLines = executedLines(classCoverage); - if (executedLines.isEmpty()) { + if (!appClasses.contains(executionData.getName())) { continue; } - - String sourcePath = resolveSourcePath(classCoverage); - Set merged = raw.get(sourcePath); - if (merged == null) { - merged = new LinkedHashSet<>(); - raw.put(sourcePath, merged); + boolean[] probes = executionData.getProbes(); + Set fired = new LinkedHashSet<>(); + for (int i = 0; i < probes.length; i++) { + if (probes[i]) { + fired.add(i); + } + } + if (!fired.isEmpty()) { + raw.put(executionData.getName(), fired); } - merged.addAll(executedLines); } if (diagnosticsEnabled()) { - diagnostic("filesWithCoverage=" + raw.size() - + ", sampleFiles=" + summarizeStrings(raw.keySet(), 5)); + diagnostic("classesWithProbes=" + raw.size() + + ", sampleClasses=" + summarizeStrings(raw.keySet(), 5)); } return toSortedMap(raw); } - private Set hitClassNames(ExecutionDataStore executionDataStore) { + // indexedClassNames returns the VM names of the app's own classes (from the + // CoverageIndex) so probe collection can skip JDK/library classes. + private Set indexedClassNames() { Set names = new LinkedHashSet<>(); - for (ExecutionData executionData : executionDataStore.getContents()) { - if (executionData.hasHits()) { - names.add(executionData.getName()); - } + for (ClassEntry entry : coverageIndex.entries()) { + names.add(entry.className); } return names; } @@ -769,51 +751,6 @@ private String summarizeStrings(Iterable values, int limit) { return sample.toString(); } - private String summarizeClassEntries(List values, int limit) { - List sample = new ArrayList<>(); - for (ClassEntry value : values) { - sample.add(value.className); - if (sample.size() >= limit) { - break; - } - } - return sample.toString(); - } - - private List executedLines(IClassCoverage classCoverage) { - int firstLine = classCoverage.getFirstLine(); - int lastLine = classCoverage.getLastLine(); - if (firstLine < 0 || lastLine < firstLine) { - return Collections.emptyList(); - } - - List lines = new ArrayList<>(); - for (int line = firstLine; line <= lastLine; line++) { - int status = classCoverage.getLine(line).getStatus(); - if (status != ICounter.EMPTY && status != ICounter.NOT_COVERED) { - lines.add(line); - } - } - return lines; - } - - private String resolveSourcePath(IClassCoverage classCoverage) { - String sourceFile = classCoverage.getSourceFileName(); - if (sourceFile == null || sourceFile.trim().isEmpty()) { - return normalizePath(classCoverage.getName() + ".java"); - } - - String packageName = classCoverage.getPackageName(); - String relativePath = packageName == null || packageName.isEmpty() - ? sourceFile - : packageName + "/" + sourceFile; - File localSource = new File(System.getProperty("user.dir"), "src/main/java/" + relativePath); - if (localSource.exists()) { - return normalizePath(localSource.getAbsolutePath()); - } - return normalizePath(relativePath); - } - private Map> toSortedMap(Map> raw) { List files = new ArrayList<>(raw.keySet()); Collections.sort(files); From 9f91308c436326cfb892f1c3f2efd717687e3555 Mon Sep 17 00:00:00 2001 From: praagyajain Date: Mon, 29 Jun 2026 15:32:15 +0530 Subject: [PATCH 4/8] fix(dedup): address PR review + fix smoke test - TCP END always emits COV before ACK (even when empty), mirroring the unix transport, so the line-oriented collector reads exactly one COV per END and cannot desync (Copilot comment). - Reconnect failures log at INFO only on the first failure, then FINE, to avoid ~1/s log spam in k8s; reset on a successful connect (Copilot comment). - smoke-javaagent.sh asserts the VM class-name key ("smoke/Work") instead of the old source-file key ("Work.java"), matching the probe-based fingerprint contract; fixes the JDK 8/17/21 smoke jobs. --- README.md | 2 +- .../io/keploy/dedup/KeployDedupAgent.java | 28 ++++++++++++------- scripts/smoke-javaagent.sh | 4 ++- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index bc12f66..ab80bae 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Keploy Enterprise drives dynamic dedup per testcase. 2. The Java agent resets JaCoCo coverage counters for that testcase. 3. Enterprise replays the testcase. 4. Enterprise sends `END /` on `/tmp/coverage_control.sock`. -5. The Java agent dumps JaCoCo execution data, resolves executed Java lines, and sends them as JSON on `/tmp/coverage_data.sock`. +5. The Java agent dumps JaCoCo execution data and sends the executed probe indices per class (`{className -> [probeIdx]}`) as JSON on `/tmp/coverage_data.sock`. Probes capture branch-level coverage (which branch a test took), so they distinguish tests that run the same lines but take different branches. 6. Enterprise writes the result to `dedupData.yaml` and uses it to identify duplicates. Coverage is collected at per-testcase granularity, not process granularity. diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index b273155..4178c60 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -48,7 +48,8 @@ import java.util.stream.Stream; /** - * Collects per-testcase JaCoCo coverage and streams executed lines back to Keploy Enterprise. + * Collects per-testcase JaCoCo coverage and streams the executed probe set per + * class ({className -> [probeIdx]}) back to Keploy Enterprise. */ public final class KeployDedupAgent { @@ -433,6 +434,9 @@ private static final class CoverageTcpClient implements Runnable, Closeable { private final Object testCaseLock = new Object(); private volatile Socket socket; private String activeTestId = ""; + // Connect retries spin ~1/s until the collector is reachable; log the first + // failure at INFO and the rest at FINE so we don't spam k8s app logs. + private boolean connectFailureLogged = false; CoverageTcpClient(CoverageCollector collector, String host, int port) { this.collector = collector; @@ -447,7 +451,9 @@ public void run() { connectAndServe(); } catch (IOException e) { if (running.get()) { - log(Level.INFO, "Keploy dedup: TCP connect to " + host + ":" + port + Level level = connectFailureLogged ? Level.FINE : Level.INFO; + connectFailureLogged = true; + log(level, "Keploy dedup: TCP connect to " + host + ":" + port + " failed (" + e.getClass().getSimpleName() + ": " + e.getMessage() + "), retrying", null); } } @@ -468,6 +474,7 @@ private void connectAndServe() throws IOException { // idles between tests waiting for the next START/END command. open.connect(new InetSocketAddress(InetAddress.getByName(host), port), SOCKET_TIMEOUT_MILLIS); socket = open; + connectFailureLogged = false; log(Level.INFO, "Keploy dedup: connected to collector at " + host + ":" + port, null); try (Socket active = open; BufferedReader reader = new BufferedReader( @@ -513,15 +520,16 @@ private void dispatch(CoverageCommand command, OutputStream out) throws IOExcept } try { - Map> executedLinesByFile = collector.capture(); - if (executedLinesByFile.isEmpty()) { - log(Level.FINE, "No Java coverage lines collected for " + command.testId, null); - } else { - // COV must precede ACK: the collector reads lines sequentially, - // so the payload is recorded before the ACK releases the caller. - writeLine(out, "COV " + GSON.toJson( - new DedupPayload(command.testId, executedLinesByFile))); + Map> executedProbesByClass = collector.capture(); + if (executedProbesByClass.isEmpty()) { + log(Level.FINE, "No Java coverage collected for " + command.testId, null); } + // Always emit COV before ACK — even when empty — so the + // line-oriented collector reads exactly one COV per END (the + // unix transport likewise always publishes). The payload is + // recorded before the ACK releases the caller. + writeLine(out, "COV " + GSON.toJson( + new DedupPayload(command.testId, executedProbesByClass))); } catch (Exception e) { log(Level.SEVERE, "Failed to collect Java coverage for " + command.testId, e); } finally { diff --git a/scripts/smoke-javaagent.sh b/scripts/smoke-javaagent.sh index 39058ee..29af495 100755 --- a/scripts/smoke-javaagent.sh +++ b/scripts/smoke-javaagent.sh @@ -108,9 +108,11 @@ public final class SmokeHarness { } String json = payload.get(); + // The fingerprint is keyed by VM class name (probe-based coverage), + // e.g. "smoke/Work" — not a source file like "Work.java". if (json == null || !json.contains("\"id\":\"test-set-0/" + mode + "\"") - || !json.contains("Work.java")) { + || !json.contains("smoke/Work")) { throw new IllegalStateException("unexpected coverage payload for " + mode + ": " + json); } System.out.println(mode + ": " + json); From d5cf474516f0dcd0f444abc11a340f86ac200e6d Mon Sep 17 00:00:00 2001 From: praagyajain Date: Thu, 2 Jul 2026 02:42:32 +0530 Subject: [PATCH 5/8] =?UTF-8?q?feat(dedup):=20CoverageReporter=20=E2=80=94?= =?UTF-8?q?=20line/branch=20coverage=20from=20probe-union=20+=20bytecode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Option B for whole-test-set coverage: a pure Java tool that reconstructs ExecutionData from a fired-probe union (the dedupFingerprints we already persist) plus a build-constant {classId, probeCount} manifest, runs JaCoCo Analyzer over the app bytecode, and sums line/branch/instr/method counters over all classes. Reuses the fingerprints verbatim as input; no JaCoCo internal APIs (id + probeCount come from the SDK's live ExecutionData). CoverageReporterSelfTest cross-checks the reconstruct-from-union path against JaCoCo's direct .exec analysis — exact match on the sample app (lines 78/105, branches 22/42, instr 452/525). Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: praagyajain --- .../io/keploy/dedup/CoverageReporter.java | 226 ++++++++++++++++++ .../dedup/CoverageReporterSelfTest.java | 120 ++++++++++ 2 files changed, 346 insertions(+) create mode 100644 keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java create mode 100644 keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java new file mode 100644 index 0000000..c0db8ed --- /dev/null +++ b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java @@ -0,0 +1,226 @@ +package io.keploy.dedup; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.jacoco.core.analysis.Analyzer; +import org.jacoco.core.analysis.CoverageBuilder; +import org.jacoco.core.analysis.IClassCoverage; +import org.jacoco.core.analysis.ICounter; +import org.jacoco.core.data.ExecutionData; +import org.jacoco.core.data.ExecutionDataStore; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +/** + * Offline, one-shot line + branch coverage computation for the dynamic-dedup + * "Option B" flow. Runs OUTSIDE the app JVM (as a short-lived job invoked by + * k8s-proxy at recording stop), so it needs no live JaCoCo agent — only: + * + *
    + *
  • classes — the app's compiled {@code .class} files (or a jar), + * i.e. the bytecode blob stored once per build tag;
  • + *
  • union — {@code {vmClassName -> [firedProbeIdx]}}: the union of + * every kept test-case fingerprint for the test set (across all + * auto-replay cycles). Reuses the exact {@code dedupFingerprints} probe + * sets already persisted for cross-cycle dedup;
  • + *
  • manifest — {@code {vmClassName -> {id, probeCount}}}: the JaCoCo + * class id (CRC64, as an unsigned-hex string) and total probe count per + * class. These are build-constant and captured by the SDK from the live + * {@code ExecutionData} at record time ({@code getId()} / + * {@code getProbes().length}), so the reporter never has to touch JaCoCo + * internal APIs to derive them.
  • + *
+ * + *

Reconstruction: for each class in the union we build an + * {@link ExecutionData} with the manifest's id + a {@code boolean[probeCount]} + * whose fired indices are set, put it in an {@link ExecutionDataStore}, then run + * the JaCoCo {@link Analyzer} over the classes. The analyzer computes each + * class's CRC64 id from its bytecode and matches it against the store; a class + * with no entry (never hit) is analyzed with all-missed probes so it still + * contributes to the DENOMINATOR. Summing {@code getLineCounter()} / + * {@code getBranchCounter()} across all classes yields the whole test set's + * line + branch coverage. + * + *

Because dedup is coverage-preserving (it only drops exact/subset + * duplicates), the union of the KEPT fingerprints equals the union of every + * replayed test, so this number is the true whole-test-set coverage. + * + *

Output ({@code --out}) is a JSON object: + * {@code {"lineCovered":N,"lineTotal":N,"branchCovered":N,"branchTotal":N, + * "instructionCovered":N,"instructionTotal":N,"methodCovered":N, + * "methodTotal":N,"classCount":N,"hitClassCount":N}}. + */ +public final class CoverageReporter { + + private static final Gson GSON = new Gson(); + + private CoverageReporter() { + } + + /** JaCoCo class id + total probe count for one class (build-constant). */ + static final class ClassMeta { + String id; // CRC64 class id, unsigned-hex (from ExecutionData.getId()) + int probeCount; // ExecutionData.getProbes().length + } + + /** Computed coverage counters, serialized to the --out file. */ + static final class CoverageResult { + long lineCovered, lineTotal; + long branchCovered, branchTotal; + long instructionCovered, instructionTotal; + long methodCovered, methodTotal; + int classCount; // classes analyzed (denominator basis) + int hitClassCount; // classes with at least one fired probe + } + + public static void main(String[] args) { + try { + Args parsed = Args.parse(args); + CoverageResult result = compute(parsed.classesPath, parsed.unionPath, parsed.manifestPath); + String json = GSON.toJson(result); + if (parsed.outPath != null) { + Files.write(parsed.outPath, json.getBytes(StandardCharsets.UTF_8)); + } else { + System.out.println(json); + } + } catch (Exception e) { + System.err.println("CoverageReporter failed: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } + + /** + * Computes coverage from a classes dir/jar, a fired-probe union, and the + * per-class id/probe-count manifest. Package-visible for unit testing. + */ + static CoverageResult compute(Path classesPath, Path unionPath, Path manifestPath) throws IOException { + Map> union = readUnion(unionPath); + Map manifest = readManifest(manifestPath); + + ExecutionDataStore store = new ExecutionDataStore(); + int hitClasses = 0; + for (Map.Entry entry : manifest.entrySet()) { + String className = entry.getKey(); + ClassMeta meta = entry.getValue(); + if (meta == null || meta.id == null || meta.probeCount < 0) { + continue; + } + boolean[] probes = new boolean[meta.probeCount]; + List fired = union.get(className); + if (fired != null) { + for (Integer idx : fired) { + if (idx != null && idx >= 0 && idx < probes.length) { + probes[idx] = true; + } + } + if (!fired.isEmpty()) { + hitClasses++; + } + } + long id = Long.parseUnsignedLong(meta.id, 16); + store.put(new ExecutionData(id, className, probes)); + } + + CoverageBuilder builder = new CoverageBuilder(); + Analyzer analyzer = new Analyzer(store, builder); + File classesFile = classesPath.toFile(); + if (!classesFile.exists()) { + throw new IOException("classes path does not exist: " + classesPath); + } + // analyzeAll walks a dir tree (or a jar) and analyzes every .class, + // computing each class's CRC64 id and matching it against the store. + analyzer.analyzeAll(classesFile); + + CoverageResult result = new CoverageResult(); + int classCount = 0; + for (IClassCoverage cc : builder.getClasses()) { + classCount++; + result.lineCovered += cc.getLineCounter().getCoveredCount(); + result.lineTotal += cc.getLineCounter().getTotalCount(); + result.branchCovered += cc.getBranchCounter().getCoveredCount(); + result.branchTotal += cc.getBranchCounter().getTotalCount(); + result.instructionCovered += cc.getInstructionCounter().getCoveredCount(); + result.instructionTotal += cc.getInstructionCounter().getTotalCount(); + result.methodCovered += cc.getMethodCounter().getCoveredCount(); + result.methodTotal += cc.getMethodCounter().getTotalCount(); + } + result.classCount = classCount; + result.hitClassCount = hitClasses; + return result; + } + + private static Map> readUnion(Path path) throws IOException { + Type type = new TypeToken>>() { + }.getType(); + Map> union = GSON.fromJson(readString(path), type); + return union == null ? java.util.Collections.>emptyMap() : union; + } + + private static Map readManifest(Path path) throws IOException { + Type type = new TypeToken>() { + }.getType(); + Map manifest = GSON.fromJson(readString(path), type); + if (manifest == null) { + throw new IOException("manifest is empty or invalid: " + path); + } + return manifest; + } + + private static String readString(Path path) throws IOException { + return new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + } + + /** Minimal --flag value arg parser. */ + private static final class Args { + Path classesPath; + Path unionPath; + Path manifestPath; + Path outPath; + + static Args parse(String[] argv) { + Args a = new Args(); + for (int i = 0; i + 1 < argv.length; i += 2) { + String flag = argv[i]; + String val = argv[i + 1]; + switch (flag) { + case "--classes": + a.classesPath = Paths.get(val); + break; + case "--union": + a.unionPath = Paths.get(val); + break; + case "--manifest": + a.manifestPath = Paths.get(val); + break; + case "--out": + a.outPath = Paths.get(val); + break; + default: + throw new IllegalArgumentException("unknown flag: " + flag); + } + } + if (a.classesPath == null || a.unionPath == null || a.manifestPath == null) { + throw new IllegalArgumentException( + "usage: CoverageReporter --classes --union " + + "--manifest [--out ]"); + } + return a; + } + } + + // Referenced to keep the ICounter import meaningful for readers scanning + // deps; getLineCounter()/getBranchCounter() return ICounter instances. + @SuppressWarnings("unused") + private static long coveredOf(ICounter counter) { + return counter.getCoveredCount(); + } +} diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java new file mode 100644 index 0000000..7f51d19 --- /dev/null +++ b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java @@ -0,0 +1,120 @@ +package io.keploy.dedup; + +import org.jacoco.core.analysis.Analyzer; +import org.jacoco.core.analysis.CoverageBuilder; +import org.jacoco.core.analysis.IClassCoverage; +import org.jacoco.core.data.ExecutionData; +import org.jacoco.core.data.ExecutionDataStore; +import org.jacoco.core.tools.ExecFileLoader; + +import java.io.File; + +/** + * Standalone validation for {@link CoverageReporter}: proves that computing + * coverage by reconstructing {@link ExecutionData} from a fired-probe + * union + a per-class (id, probeCount) manifest produces the identical + * line/branch numbers as JaCoCo's own analysis of a real {@code .exec} dump. + * + *

Given a real {@code .exec} (produced by running the app under + * {@code -javaagent:jacocoagent.jar}) and the matching classes dir, it: + *

    + *
  1. computes GROUND TRUTH: {@code Analyzer} over the loaded exec store;
  2. + *
  3. derives a manifest (id hex + probeCount) and union (fired indices) from + * that same exec — exactly what the SDK would persist;
  4. + *
  5. REBUILDS an execution store from manifest + union (the + * {@code CoverageReporter} path) and re-analyzes;
  6. + *
  7. asserts the two coverage results are equal.
  8. + *
+ * + *

Usage: {@code java -cp keploy-sdk.jar + * io.keploy.dedup.CoverageReporterSelfTest --exec --classes

} + */ +public final class CoverageReporterSelfTest { + + private CoverageReporterSelfTest() { + } + + public static void main(String[] args) throws Exception { + String execPath = null; + String classesPath = null; + for (int i = 0; i + 1 < args.length; i += 2) { + if ("--exec".equals(args[i])) { + execPath = args[i + 1]; + } else if ("--classes".equals(args[i])) { + classesPath = args[i + 1]; + } + } + if (execPath == null || classesPath == null) { + System.err.println("usage: CoverageReporterSelfTest --exec --classes "); + System.exit(2); + } + + ExecFileLoader loader = new ExecFileLoader(); + loader.load(new File(execPath)); + ExecutionDataStore realStore = loader.getExecutionDataStore(); + + // (1) GROUND TRUTH — analyze the real exec directly. + long[] truth = analyze(realStore, classesPath); + + // (2) Derive manifest + union from the real exec (what the SDK captures), + // then (3) REBUILD a fresh store from them (the CoverageReporter path). + ExecutionDataStore rebuilt = new ExecutionDataStore(); + int classes = 0; + int firedProbes = 0; + for (ExecutionData data : realStore.getContents()) { + classes++; + boolean[] src = data.getProbes(); + boolean[] copy = new boolean[src.length]; // manifest.probeCount = src.length + for (int i = 0; i < src.length; i++) { + if (src[i]) { // union = fired indices + copy[i] = true; + firedProbes++; + } + } + // manifest.id = data.getId() (round-tripped through unsigned-hex, like the real pipeline) + long id = Long.parseUnsignedLong(Long.toHexString(data.getId()), 16); + rebuilt.put(new ExecutionData(id, data.getName(), copy)); + } + long[] rebuiltCov = analyze(rebuilt, classesPath); + + System.out.println("classes in exec: " + classes + ", fired probes: " + firedProbes); + printRow("GROUND TRUTH (direct exec)", truth); + printRow("REBUILT (manifest+union) ", rebuiltCov); + + boolean equal = true; + for (int i = 0; i < truth.length; i++) { + if (truth[i] != rebuiltCov[i]) { + equal = false; + break; + } + } + if (equal) { + System.out.println("RESULT: PASS — reconstruction matches JaCoCo's direct analysis exactly."); + System.exit(0); + } else { + System.out.println("RESULT: FAIL — reconstruction diverges from ground truth."); + System.exit(1); + } + } + + /** Returns {lineCov, lineTot, branchCov, branchTot, instrCov, instrTot}. */ + private static long[] analyze(ExecutionDataStore store, String classesPath) throws Exception { + CoverageBuilder builder = new CoverageBuilder(); + new Analyzer(store, builder).analyzeAll(new File(classesPath)); + long lc = 0, lt = 0, bc = 0, bt = 0, ic = 0, it = 0; + for (IClassCoverage cc : builder.getClasses()) { + lc += cc.getLineCounter().getCoveredCount(); + lt += cc.getLineCounter().getTotalCount(); + bc += cc.getBranchCounter().getCoveredCount(); + bt += cc.getBranchCounter().getTotalCount(); + ic += cc.getInstructionCounter().getCoveredCount(); + it += cc.getInstructionCounter().getTotalCount(); + } + return new long[]{lc, lt, bc, bt, ic, it}; + } + + private static void printRow(String label, long[] c) { + System.out.printf("%s lines %d/%d branches %d/%d instr %d/%d%n", + label, c[0], c[1], c[2], c[3], c[4], c[5]); + } +} From 566d8440a1f5fa5cf710c2bae9a568828cd56440 Mon Sep 17 00:00:00 2001 From: praagyajain Date: Thu, 2 Jul 2026 02:59:12 +0530 Subject: [PATCH 6/8] feat(dedup): SDK exports bytecode+manifest once per build for coverage (Option B) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After warmup (all app classes loaded), the SDK ships — best-effort, async, once-per-JVM — a zip of the indexed class bytecode plus a manifest {vmClassName -> {classId(hex), probeCount}} to k8s-proxy via HTTP multipart (KEPLOY_BYTECODE_UPLOAD_URL, tagged with KEPLOY_BUILD_TAG). classId/probeCount come straight from the live ExecutionData, so the offline CoverageReporter needs no JaCoCo internal APIs. HEAD exists-check uploads at most once per build tag. Gated by the two envs (webhook-injected); absent => no-op, so dedup-only deployments are unchanged. Fully non-fatal: never throws into the app or the per-test dedup path. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: praagyajain --- .../io/keploy/dedup/KeployDedupAgent.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index 4178c60..f3661b7 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -23,9 +23,12 @@ import java.lang.instrument.Instrumentation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.URL; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -42,6 +45,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; @@ -252,6 +257,76 @@ private static String normalizePath(String path) { return path.replace(File.separatorChar, '/'); } + // Build-constant coverage metadata for one class, serialized into the + // manifest the offline CoverageReporter consumes. id = CRC64 class id + // (unsigned-hex), probeCount = JaCoCo probe array length. + private static final class ManifestEntry { + private String id; + private int probeCount; + } + + // bytecodeAlreadyStored HEAD-checks whether k8s-proxy already holds the + // bytecode blob for this build tag, so each ephemeral replay pod uploads + // at most once per build. Best-effort: any error => treat as absent and + // attempt the upload (the server dedupes idempotently by buildTag anyway). + private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { + try { + URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); + HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + conn.setRequestMethod("HEAD"); + conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); + conn.setReadTimeout(SOCKET_TIMEOUT_MILLIS); + int code = conn.getResponseCode(); + conn.disconnect(); + return code == HttpURLConnection.HTTP_OK; + } catch (Exception e) { + return false; + } + } + + // postBytecode uploads the manifest JSON + classes zip as a multipart form + // to k8s-proxy's bytecode endpoint, tagged with the build tag. + private static void postBytecode(String baseUrl, String buildTag, String manifestJson, byte[] zipBytes) + throws IOException { + String boundary = "keployBytecodeBoundary" + Integer.toHexString(System.identityHashCode(zipBytes)); + URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); + HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + conn.setDoOutput(true); + conn.setRequestMethod("POST"); + conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); + conn.setReadTimeout(30000); + conn.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary); + try (OutputStream out = conn.getOutputStream()) { + writeAscii(out, "--" + boundary + "\r\n"); + writeAscii(out, "Content-Disposition: form-data; name=\"manifest\"; filename=\"manifest.json\"\r\n"); + writeAscii(out, "Content-Type: application/json\r\n\r\n"); + out.write(manifestJson.getBytes(StandardCharsets.UTF_8)); + writeAscii(out, "\r\n--" + boundary + "\r\n"); + writeAscii(out, "Content-Disposition: form-data; name=\"classes\"; filename=\"classes.zip\"\r\n"); + writeAscii(out, "Content-Type: application/zip\r\n\r\n"); + out.write(zipBytes); + writeAscii(out, "\r\n--" + boundary + "--\r\n"); + out.flush(); + } + int code = conn.getResponseCode(); + conn.disconnect(); + if (code / 100 != 2) { + throw new IOException("bytecode upload returned HTTP " + code); + } + } + + private static void writeAscii(OutputStream out, String s) throws IOException { + out.write(s.getBytes(StandardCharsets.US_ASCII)); + } + + private static String urlEncode(String s) { + try { + return URLEncoder.encode(s, StandardCharsets.UTF_8.name()); + } catch (Exception e) { + return s; + } + } + private static void deleteSocketFile(File file) { if (file.exists() && !file.delete()) { log(Level.FINE, "Failed to delete socket file " + file.getAbsolutePath(), null); @@ -356,6 +431,9 @@ private void dispatch(CoverageCommand command, OutputStream outputStream) { // Warm up app classes once before the first measured window so // one-time lines aren't charged to the first test. collector.warmup(); + // Best-effort, async, once-per-build: ship bytecode + manifest so + // k8s-proxy can compute line/branch coverage offline (Option B). + collector.exportBuildArtifactsOnce(); collector.reset(); writeAck(outputStream); return; @@ -504,6 +582,9 @@ private void dispatch(CoverageCommand command, OutputStream out) throws IOExcept // Warm up app classes once before the first measured window so // one-time lines aren't charged to the first test. collector.warmup(); + // Best-effort, async, once-per-build: ship bytecode + manifest so + // k8s-proxy can compute line/branch coverage offline (Option B). + collector.exportBuildArtifactsOnce(); collector.reset(); writeLine(out, "ACK"); return; @@ -598,6 +679,8 @@ private static final class CoverageCollector { private final JacocoClient jacocoClient; private final CoverageIndex coverageIndex; private final AtomicBoolean warmed = new AtomicBoolean(false); + // Once-per-JVM guard for the build-artifact (bytecode+manifest) upload. + private static final AtomicBoolean ARTIFACTS_EXPORTED = new AtomicBoolean(false); private CoverageCollector(JacocoClient jacocoClient, CoverageIndex coverageIndex) { this.jacocoClient = jacocoClient; @@ -771,6 +854,91 @@ private Map> toSortedMap(Map> raw) { } return sorted; } + + // exportBuildArtifactsOnce uploads (ONCE per JVM/build) the app's class + // bytecode + a {className -> (classId, probeCount)} manifest to k8s-proxy, + // so the offline CoverageReporter can later reconstruct line/branch + // coverage from the persisted dedup-fingerprint probe union (Option B). + // Best-effort and fully async: coverage reporting must never affect the + // app or the per-test dedup path. Gated by KEPLOY_BYTECODE_UPLOAD_URL + + // KEPLOY_BUILD_TAG (both injected by the webhook); absent => no-op, so + // existing dedup-only deployments are unchanged. Triggered after warmup() + // so every app class is loaded and appears in the manifest. + void exportBuildArtifactsOnce() { + final String url = envOrProperty("KEPLOY_BYTECODE_UPLOAD_URL", "keploy.bytecode.upload.url", ""); + final String buildTag = envOrProperty("KEPLOY_BUILD_TAG", "keploy.build.tag", ""); + if (url.isEmpty() || buildTag.isEmpty()) { + return; + } + if (!ARTIFACTS_EXPORTED.compareAndSet(false, true)) { + return; + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + if (bytecodeAlreadyStored(url, buildTag)) { + log(Level.FINE, "Keploy dedup: bytecode already stored for buildTag " + buildTag, null); + return; + } + Map manifest = buildBuildManifest(); + if (manifest.isEmpty()) { + // No app classes visible yet — allow a later START to retry. + ARTIFACTS_EXPORTED.set(false); + return; + } + byte[] zip = zipIndexedClasses(); + postBytecode(url, buildTag, GSON.toJson(manifest), zip); + log(Level.INFO, "Keploy dedup: uploaded bytecode+manifest for buildTag " + + buildTag + " (" + manifest.size() + " classes)", null); + } catch (Throwable e) { + log(Level.FINE, "Keploy dedup: bytecode export failed (non-fatal)", null); + } + } + }, "keploy-dedup-bytecode-export"); + t.setDaemon(true); + t.start(); + } + + // buildBuildManifest reads a non-resetting JaCoCo dump and records each + // app class's runtime classId (== the CRC64 the offline Analyzer computes + // for the same bytecode) and probe count. These are build-constant, so a + // single post-warmup dump captures the whole manifest — and it means the + // reporter never needs JaCoCo internal APIs to derive id/probeCount. + private Map buildBuildManifest() throws IOException { + Map manifest = new LinkedHashMap<>(); + byte[] dump = jacocoClient.dump(true, false); + if (dump.length == 0) { + return manifest; + } + ExecFileLoader loader = new ExecFileLoader(); + loader.load(new ByteArrayInputStream(dump)); + Set appClasses = indexedClassNames(); + for (ExecutionData data : loader.getExecutionDataStore().getContents()) { + if (!appClasses.contains(data.getName())) { + continue; + } + ManifestEntry entry = new ManifestEntry(); + entry.id = Long.toHexString(data.getId()); + entry.probeCount = data.getProbes().length; + manifest.put(data.getName(), entry); + } + return manifest; + } + + // zipIndexedClasses packs every indexed app .class (already in memory as + // ClassEntry.bytes) into a zip keyed by ".class". + private byte[] zipIndexedClasses() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(256 * 1024); + try (ZipOutputStream zos = new ZipOutputStream(baos)) { + for (ClassEntry entry : coverageIndex.entries()) { + zos.putNextEntry(new ZipEntry(entry.className + ".class")); + zos.write(entry.bytes); + zos.closeEntry(); + } + } + return baos.toByteArray(); + } } private static final class JacocoClient { From 89c27b99e6324940e6d0518b509b6ab0ad3e6770 Mon Sep 17 00:00:00 2001 From: praagyajain Date: Thu, 2 Jul 2026 03:48:03 +0530 Subject: [PATCH 7/8] feat(dedup): tolerate proxy self-signed cert on bytecode upload The bytecode/manifest upload targets k8s-proxy's in-cluster HTTPS control port (:8080), whose cert is self-signed and may not chain to the app JVM's truststore. Since the upload is a best-effort, cluster-internal data-plane call (like the raw-TCP coverage collector), install a trust-all SSLSocketFactory for that HTTPS call only. No effect on plain-HTTP endpoints. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: praagyajain --- .../io/keploy/dedup/KeployDedupAgent.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index f3661b7..d16ada5 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -273,6 +273,7 @@ private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { try { URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + relaxTlsIfHttps(conn); conn.setRequestMethod("HEAD"); conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); conn.setReadTimeout(SOCKET_TIMEOUT_MILLIS); @@ -291,6 +292,7 @@ private static void postBytecode(String baseUrl, String buildTag, String manifes String boundary = "keployBytecodeBoundary" + Integer.toHexString(System.identityHashCode(zipBytes)); URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + relaxTlsIfHttps(conn); conn.setDoOutput(true); conn.setRequestMethod("POST"); conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); @@ -319,6 +321,43 @@ private static void writeAscii(OutputStream out, String s) throws IOException { out.write(s.getBytes(StandardCharsets.US_ASCII)); } + // relaxTlsIfHttps makes the bytecode upload tolerate k8s-proxy's self-signed + // in-cluster cert. The upload is a best-effort, cluster-internal data-plane + // call (like the raw-TCP coverage collector), so trust-all here is acceptable + // and avoids depending on the app JVM's truststore chaining to the proxy CA. + private static volatile javax.net.ssl.SSLSocketFactory trustAllFactory; + + private static void relaxTlsIfHttps(HttpURLConnection conn) { + if (!(conn instanceof javax.net.ssl.HttpsURLConnection)) { + return; + } + try { + if (trustAllFactory == null) { + javax.net.ssl.SSLContext ctx = javax.net.ssl.SSLContext.getInstance("TLS"); + ctx.init(null, new javax.net.ssl.TrustManager[]{new javax.net.ssl.X509TrustManager() { + @Override + public void checkClientTrusted(java.security.cert.X509Certificate[] c, String a) { + } + + @Override + public void checkServerTrusted(java.security.cert.X509Certificate[] c, String a) { + } + + @Override + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return new java.security.cert.X509Certificate[0]; + } + }}, null); + trustAllFactory = ctx.getSocketFactory(); + } + javax.net.ssl.HttpsURLConnection https = (javax.net.ssl.HttpsURLConnection) conn; + https.setSSLSocketFactory(trustAllFactory); + https.setHostnameVerifier((hostname, session) -> true); + } catch (Exception e) { + // Leave the default factory; the upload is best-effort. + } + } + private static String urlEncode(String s) { try { return URLEncoder.encode(s, StandardCharsets.UTF_8.name()); From 4d03ce112cfebd82a3fc283979e82d7424f7934b Mon Sep 17 00:00:00 2001 From: praagyajain Date: Fri, 3 Jul 2026 00:35:27 +0530 Subject: [PATCH 8/8] feat(dedup): send bytecode manifest+zip over the collector channel Build a live manifest ({vmClassName -> {classId, probeCount}}) from the proven per-test capture() path and, when it grows, send the app class bytecode as a base64 "CLASSES ..." frame on the existing collector TCP connection (:36340) right after the COV frame. This survives the replay agent's outbound mock interception, which 502'd the earlier HTTPS upload. k8s-proxy uses this bytecode + the unioned fingerprints to compute whole-test-set coverage offline. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../io/keploy/dedup/KeployDedupAgent.java | 169 ++++++++++-------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index d16ada5..2d24b5c 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -270,8 +270,10 @@ private static final class ManifestEntry { // at most once per build. Best-effort: any error => treat as absent and // attempt the upload (the server dedupes idempotently by buildTag anyway). private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { + String target = baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag); try { - URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); + log(Level.INFO, "Keploy dedup: exists-check HEAD " + target, null); + URL u = new URL(target); HttpURLConnection conn = (HttpURLConnection) u.openConnection(); relaxTlsIfHttps(conn); conn.setRequestMethod("HEAD"); @@ -279,8 +281,11 @@ private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { conn.setReadTimeout(SOCKET_TIMEOUT_MILLIS); int code = conn.getResponseCode(); conn.disconnect(); + log(Level.INFO, "Keploy dedup: exists-check HEAD -> HTTP " + code, null); return code == HttpURLConnection.HTTP_OK; } catch (Exception e) { + log(Level.WARNING, "Keploy dedup: exists-check HEAD failed (treating as absent): " + + e.getClass().getName() + ": " + e.getMessage(), null); return false; } } @@ -290,7 +295,10 @@ private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { private static void postBytecode(String baseUrl, String buildTag, String manifestJson, byte[] zipBytes) throws IOException { String boundary = "keployBytecodeBoundary" + Integer.toHexString(System.identityHashCode(zipBytes)); - URL u = new URL(baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag)); + String target = baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag); + log(Level.INFO, "Keploy dedup: POST bytecode -> " + target + + " (manifest=" + manifestJson.length() + "B, zip=" + zipBytes.length + "B)", null); + URL u = new URL(target); HttpURLConnection conn = (HttpURLConnection) u.openConnection(); relaxTlsIfHttps(conn); conn.setDoOutput(true); @@ -299,6 +307,7 @@ private static void postBytecode(String baseUrl, String buildTag, String manifes conn.setReadTimeout(30000); conn.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary); try (OutputStream out = conn.getOutputStream()) { + log(Level.INFO, "Keploy dedup: POST bytecode connection open; streaming body", null); writeAscii(out, "--" + boundary + "\r\n"); writeAscii(out, "Content-Disposition: form-data; name=\"manifest\"; filename=\"manifest.json\"\r\n"); writeAscii(out, "Content-Type: application/json\r\n\r\n"); @@ -311,9 +320,25 @@ private static void postBytecode(String baseUrl, String buildTag, String manifes out.flush(); } int code = conn.getResponseCode(); + String body = readStream(code >= 400 ? conn.getErrorStream() : conn.getInputStream()); conn.disconnect(); + log(Level.INFO, "Keploy dedup: POST bytecode -> HTTP " + code + + (body.isEmpty() ? "" : (" body=" + body)), null); if (code / 100 != 2) { - throw new IOException("bytecode upload returned HTTP " + code); + throw new IOException("bytecode upload returned HTTP " + code + (body.isEmpty() ? "" : (": " + body))); + } + } + + private static String readStream(InputStream in) { + if (in == null) { + return ""; + } + try { + byte[] b = readAllBytes(in); + String s = new String(b, StandardCharsets.UTF_8).trim(); + return s.length() > 300 ? s.substring(0, 300) : s; + } catch (Exception e) { + return ""; } } @@ -353,8 +378,9 @@ public java.security.cert.X509Certificate[] getAcceptedIssuers() { javax.net.ssl.HttpsURLConnection https = (javax.net.ssl.HttpsURLConnection) conn; https.setSSLSocketFactory(trustAllFactory); https.setHostnameVerifier((hostname, session) -> true); + log(Level.INFO, "Keploy dedup: relaxed TLS (trust-all) for bytecode upload", null); } catch (Exception e) { - // Leave the default factory; the upload is best-effort. + log(Level.WARNING, "Keploy dedup: failed to relax TLS: " + e.getMessage(), null); } } @@ -470,9 +496,6 @@ private void dispatch(CoverageCommand command, OutputStream outputStream) { // Warm up app classes once before the first measured window so // one-time lines aren't charged to the first test. collector.warmup(); - // Best-effort, async, once-per-build: ship bytecode + manifest so - // k8s-proxy can compute line/branch coverage offline (Option B). - collector.exportBuildArtifactsOnce(); collector.reset(); writeAck(outputStream); return; @@ -621,9 +644,6 @@ private void dispatch(CoverageCommand command, OutputStream out) throws IOExcept // Warm up app classes once before the first measured window so // one-time lines aren't charged to the first test. collector.warmup(); - // Best-effort, async, once-per-build: ship bytecode + manifest so - // k8s-proxy can compute line/branch coverage offline (Option B). - collector.exportBuildArtifactsOnce(); collector.reset(); writeLine(out, "ACK"); return; @@ -650,6 +670,13 @@ private void dispatch(CoverageCommand command, OutputStream out) throws IOExcept // recorded before the ACK releases the caller. writeLine(out, "COV " + GSON.toJson( new DedupPayload(command.testId, executedProbesByClass))); + // Ride the bytecode+manifest over this same raw-TCP channel + // (the only app->proxy path the replay agent doesn't mock- + // intercept), when the manifest has grown. + String classesFrame = collector.pollBytecodeFrame(); + if (classesFrame != null) { + writeLine(out, classesFrame); + } } catch (Exception e) { log(Level.SEVERE, "Failed to collect Java coverage for " + command.testId, e); } finally { @@ -718,8 +745,17 @@ private static final class CoverageCollector { private final JacocoClient jacocoClient; private final CoverageIndex coverageIndex; private final AtomicBoolean warmed = new AtomicBoolean(false); - // Once-per-JVM guard for the build-artifact (bytecode+manifest) upload. - private static final AtomicBoolean ARTIFACTS_EXPORTED = new AtomicBoolean(false); + // Coverage manifest ({vmClassName -> {classId, probeCount}}) accumulated + // from the SAME per-test capture() path that produces dedup fingerprints — + // so its class matching is proven correct (a separate post-warmup dump + // raced the START reset and matched 0 classes). Build-constant, grows as + // new app classes are hit. Uploaded (overwrite) whenever it grows, so the + // final upload covers every class in the coverage union. + private final java.util.concurrent.ConcurrentMap liveManifest = + new java.util.concurrent.ConcurrentHashMap<>(); + private final java.util.concurrent.atomic.AtomicInteger lastUploadedManifestSize = + new java.util.concurrent.atomic.AtomicInteger(-1); + private final AtomicBoolean uploadInFlight = new AtomicBoolean(false); private CoverageCollector(JacocoClient jacocoClient, CoverageIndex coverageIndex) { this.jacocoClient = jacocoClient; @@ -841,6 +877,17 @@ private Map> capture() throws IOException { continue; } boolean[] probes = executionData.getProbes(); + // Record this class's build-constant (classId, probeCount) for the + // coverage manifest. Same matching as the fingerprint below, so the + // manifest can never miss a class that appears in the union. + final int probeCount = probes.length; + final long classId = executionData.getId(); + liveManifest.computeIfAbsent(executionData.getName(), k -> { + ManifestEntry m = new ManifestEntry(); + m.id = Long.toHexString(classId); + m.probeCount = probeCount; + return m; + }); Set fired = new LinkedHashSet<>(); for (int i = 0; i < probes.length; i++) { if (probes[i]) { @@ -894,88 +941,56 @@ private Map> toSortedMap(Map> raw) { return sorted; } - // exportBuildArtifactsOnce uploads (ONCE per JVM/build) the app's class - // bytecode + a {className -> (classId, probeCount)} manifest to k8s-proxy, - // so the offline CoverageReporter can later reconstruct line/branch - // coverage from the persisted dedup-fingerprint probe union (Option B). - // Best-effort and fully async: coverage reporting must never affect the - // app or the per-test dedup path. Gated by KEPLOY_BYTECODE_UPLOAD_URL + - // KEPLOY_BUILD_TAG (both injected by the webhook); absent => no-op, so - // existing dedup-only deployments are unchanged. Triggered after warmup() - // so every app class is loaded and appears in the manifest. - void exportBuildArtifactsOnce() { - final String url = envOrProperty("KEPLOY_BYTECODE_UPLOAD_URL", "keploy.bytecode.upload.url", ""); + // pollBytecodeFrame returns a "CLASSES " + // line to send over the collector channel (:36340) IF the coverage manifest + // has grown since the last send, else null. The raw-TCP collector channel + // is the ONLY app->proxy path that survives the replay agent's mock + // interception (an HTTPS upload gets 502'd as an unmatched outbound mock), + // so the bytecode rides it alongside the COV frames. buildTag/manifest/zip + // are base64'd to keep the whole frame on one line. Called after each + // capture; a no-op until the manifest grows or when KEPLOY_BUILD_TAG unset. + String pollBytecodeFrame() { final String buildTag = envOrProperty("KEPLOY_BUILD_TAG", "keploy.build.tag", ""); - if (url.isEmpty() || buildTag.isEmpty()) { - return; - } - if (!ARTIFACTS_EXPORTED.compareAndSet(false, true)) { - return; + if (buildTag.isEmpty()) { + return null; } - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - if (bytecodeAlreadyStored(url, buildTag)) { - log(Level.FINE, "Keploy dedup: bytecode already stored for buildTag " + buildTag, null); - return; - } - Map manifest = buildBuildManifest(); - if (manifest.isEmpty()) { - // No app classes visible yet — allow a later START to retry. - ARTIFACTS_EXPORTED.set(false); - return; - } - byte[] zip = zipIndexedClasses(); - postBytecode(url, buildTag, GSON.toJson(manifest), zip); - log(Level.INFO, "Keploy dedup: uploaded bytecode+manifest for buildTag " - + buildTag + " (" + manifest.size() + " classes)", null); - } catch (Throwable e) { - log(Level.FINE, "Keploy dedup: bytecode export failed (non-fatal)", null); - } - } - }, "keploy-dedup-bytecode-export"); - t.setDaemon(true); - t.start(); - } - - // buildBuildManifest reads a non-resetting JaCoCo dump and records each - // app class's runtime classId (== the CRC64 the offline Analyzer computes - // for the same bytecode) and probe count. These are build-constant, so a - // single post-warmup dump captures the whole manifest — and it means the - // reporter never needs JaCoCo internal APIs to derive id/probeCount. - private Map buildBuildManifest() throws IOException { - Map manifest = new LinkedHashMap<>(); - byte[] dump = jacocoClient.dump(true, false); - if (dump.length == 0) { - return manifest; + final int size = liveManifest.size(); + if (size == 0 || size <= lastUploadedManifestSize.get()) { + return null; // nothing new since the last send } - ExecFileLoader loader = new ExecFileLoader(); - loader.load(new ByteArrayInputStream(dump)); - Set appClasses = indexedClassNames(); - for (ExecutionData data : loader.getExecutionDataStore().getContents()) { - if (!appClasses.contains(data.getName())) { - continue; - } - ManifestEntry entry = new ManifestEntry(); - entry.id = Long.toHexString(data.getId()); - entry.probeCount = data.getProbes().length; - manifest.put(data.getName(), entry); + try { + Map snapshot = new LinkedHashMap<>(liveManifest); + byte[] zip = zipIndexedClasses(); + String manifestJson = GSON.toJson(snapshot); + java.util.Base64.Encoder enc = java.util.Base64.getEncoder(); + String frame = "CLASSES " + + enc.encodeToString(buildTag.getBytes(StandardCharsets.UTF_8)) + " " + + enc.encodeToString(manifestJson.getBytes(StandardCharsets.UTF_8)) + " " + + enc.encodeToString(zip); + lastUploadedManifestSize.set(size); + log(Level.INFO, "Keploy dedup: sending CLASSES frame over collector channel (classes=" + + size + ", zipBytes=" + zip.length + ")", null); + return frame; + } catch (Throwable e) { + log(Level.WARNING, "Keploy dedup: failed to build CLASSES frame: " + e.getMessage(), null); + return null; } - return manifest; } // zipIndexedClasses packs every indexed app .class (already in memory as // ClassEntry.bytes) into a zip keyed by ".class". private byte[] zipIndexedClasses() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(256 * 1024); + int n = 0; try (ZipOutputStream zos = new ZipOutputStream(baos)) { for (ClassEntry entry : coverageIndex.entries()) { zos.putNextEntry(new ZipEntry(entry.className + ".class")); zos.write(entry.bytes); zos.closeEntry(); + n++; } } + log(Level.INFO, "Keploy dedup: zipIndexedClasses entries=" + n + " bytes=" + baos.size(), null); return baos.toByteArray(); } }