diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java index 547db349e2fa7..7353be0c1b777 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java @@ -332,7 +332,9 @@ public void testZstdCompressorLevel() throws Exception { } final List showPipeResult = - client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList; + client.showPipe( + new TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER)) + .pipeInfoList; showPipeResult.removeIf(i -> i.getId().startsWith("__consensus")); Assert.assertEquals( 3, diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java index 3900506a658b5..e7afe65ab81f0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java @@ -221,105 +221,17 @@ public void testZstdCompressorLevel() throws Exception { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "insert into root.db.d1(time, s1) values (1, 1)", - "insert into root.db.d1(time, s2) values (1, 1)", - "insert into root.db.d1(time, s3) values (1, 1)", - "insert into root.db.d1(time, s4) values (1, 1)", - "insert into root.db.d1(time, s5) values (1, 1)", - "flush"), - null); - - // Create 5 pipes with different zstd compression levels, p4 and p5 should fail. - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p1" - + " with source ('source.pattern'='root.db.d1.s1')" - + " with sink (" - + "'sink.ip'='%s'," - + "'sink.port'='%s'," - + "'sink.compressor'='zstd, zstd'," - + "'sink.compressor.zstd.level'='3')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p2" - + " with source ('source.pattern'='root.db.d1.s2')" - + " with sink (" - + "'sink.ip'='%s'," - + "'sink.port'='%s'," - + "'sink.compressor'='zstd, zstd'," - + "'sink.compressor.zstd.level'='22')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p3" - + " with source ('source.pattern'='root.db.d1.s3')" - + " with sink (" - + "'sink.ip'='%s'," - + "'sink.port'='%s'," - + "'sink.compressor'='zstd, zstd'," - + "'sink.compressor.zstd.level'='-131072')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p4" - + " with source ('source.pattern'='root.db.d1.s4')" - + " with sink (" - + "'sink.ip'='%s'," - + "'sink.port'='%s'," - + "'sink.compressor'='zstd, zstd'," - + "'sink.compressor.zstd.level'='-131073')", - receiverIp, receiverPort)); - fail(); - } catch (SQLException e) { - // Make sure the error message in IoTDBConnector.java is returned - Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p5" - + " with source ('source.pattern'='root.db.d1.s5')" - + " with sink (" - + "'sink.ip'='%s'," - + "'sink.port'='%s'," - + "'sink.compressor'='zstd, zstd'," - + "'sink.compressor.zstd.level'='23')", - receiverIp, receiverPort)); - fail(); - } catch (SQLException e) { - // Make sure the error message in IoTDBConnector.java is returned - Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); - } + // Create legal zstd level pipes one by one, so the assertion identifies the exact level + // that fails and avoids concurrent historical TsFile splitting for this level test. + createZstdPipeAndAssertData( + "p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", handleFailure); + createZstdPipeAndAssertData( + "p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", handleFailure); + createZstdPipeAndAssertData( + "p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", handleFailure); + + assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, receiverPort); + assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, receiverPort); final List showPipeResult = client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList; @@ -329,13 +241,71 @@ public void testZstdCompressorLevel() throws Exception { showPipeResult.stream() .filter(info -> !info.id.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) .count()); + } + } - TestUtils.assertDataEventuallyOnEnv( - receiverEnv, - "count timeseries root.db.**", - "count(timeseries),", - Collections.singleton("3,"), - handleFailure); + private void createZstdPipeAndAssertData( + final String pipeName, + final String sourcePattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort, + final String measurement, + final Consumer handleFailure) { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + String.format("insert into root.db.d1(time, %s) values (1, 1)", measurement), "flush"), + null); + + try { + createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp, receiverPort); + } catch (final SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + String.format("select count(%s) from root.db.d1", measurement), + String.format("count(root.db.d1.%s),", measurement), + Collections.singleton("1,"), + handleFailure); + } + + private void assertCreateZstdPipeFailed( + final String pipeName, + final String sourcePattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort) { + try { + createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp, receiverPort); + fail(); + } catch (final SQLException e) { + Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); + } + } + + private void createZstdPipe( + final String pipeName, + final String sourcePattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort) + throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe %s" + + " with source ('source.pattern'='%s')" + + " with sink (" + + "'sink.ip'='%s'," + + "'sink.port'='%s'," + + "'sink.compressor'='zstd, zstd'," + + "'sink.compressor.zstd.level'='%s')", + pipeName, sourcePattern, receiverIp, receiverPort, zstdLevel)); } } }