From 00d64d6f37b0864e1b8e8822f6eb7da584c65752 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 4 Mar 2026 22:44:47 +0800 Subject: [PATCH 1/4] add basic it --- .../IoTDBIoTConsensusV23C3DBasicIT.java | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java new file mode 100644 index 000000000000..ab8f5266fa4e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2; + +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +/** + * Basic 3C3D (3 ConfigNode, 3 DataNode) integration test for IoTConsensusV2. + * + *

This test verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write + * data 3. Execute flush on cluster 4. Query and verify data was written successfully + * + *

Additionally tests replica consistency: after stopping the leader DataNode, the follower + * should be elected as new leader and serve the same data. + */ +@Category({ClusterIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBIoTConsensusV23C3DBasicIT extends IoTDBRegionOperationReliabilityITFramework { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicIT.class); + + private static final int CONFIG_NODE_NUM = 3; + private static final int DATA_NODE_NUM = 3; + private static final int DATA_REPLICATION_FACTOR = 2; + private static final int SCHEMA_REPLICATION_FACTOR = 3; + + /** Timeout in seconds for 3C3D cluster init. */ + private static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; + + private static final String INSERTION1 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; + private static final String INSERTION2 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)"; + private static final String INSERTION3 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)"; + private static final String FLUSH_COMMAND = "flush on cluster"; + private static final String COUNT_QUERY = "select count(*) from root.sg.**"; + private static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(DATA_REPLICATION_FACTOR) + .setSchemaReplicationFactor(SCHEMA_REPLICATION_FACTOR) + .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE); + + EnvFactory.getEnv() + .initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS); + } + + @Test + public void test3C3DWriteFlushAndQuery() throws Exception { + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + // Write data + LOGGER.info("Writing data to 3C3D cluster..."); + statement.execute(INSERTION1); + statement.execute(INSERTION2); + statement.execute(INSERTION3); + + // Flush on cluster + LOGGER.info("Executing flush on cluster..."); + statement.execute(FLUSH_COMMAND); + + // Query and verify data was written successfully + verifyDataConsistency(statement); + + LOGGER.info("3C3D IoTConsensusV2 basic test passed"); + } + } + + /** + * Test replica consistency: with replication factor 2, stop the leader DataNode and verify the + * follower serves the same data. + */ + @Test + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + // 1. Write data and flush + LOGGER.info("Writing data to 3C3D cluster..."); + statement.execute(INSERTION1); + statement.execute(INSERTION2); + statement.execute(INSERTION3); + statement.execute(FLUSH_COMMAND); + + // 2. Query to verify initial write + verifyDataConsistency(statement); + + LOGGER.info("Sleeping 2 seconds to wait replicate ..."); + Thread.sleep(1000 * 2); + + // 3. Get data region distribution and find the leader for root.sg + Map>> dataRegionMap = + getDataRegionMapWithLeader(statement); + + // Find a data region that has our data (root.sg) - typically region 3 for first user db + int targetRegionId = -1; + int leaderDataNodeId = -1; + int followerDataNodeId = -1; + for (Map.Entry>> entry : dataRegionMap.entrySet()) { + Pair> leaderAndReplicas = entry.getValue(); + if (leaderAndReplicas.getRight().size() > 1 + && leaderAndReplicas.getRight().size() <= DATA_REPLICATION_FACTOR + && leaderAndReplicas.getLeft() > 0) { + targetRegionId = entry.getKey(); + leaderDataNodeId = leaderAndReplicas.getLeft(); + final int lambdaLeaderDataNodeId = leaderDataNodeId; + followerDataNodeId = + leaderAndReplicas.getRight().stream() + .filter(i -> i != lambdaLeaderDataNodeId) + .findAny() + .orElse(-1); + break; + } + } + + Assert.assertTrue( + "Should find a data region with leader for root.sg", + targetRegionId > 0 && leaderDataNodeId > 0 && followerDataNodeId > 0); + + DataNodeWrapper leaderNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(leaderDataNodeId) + .orElseThrow(() -> new AssertionError("DataNode not found in cluster")); + LOGGER.info( + "Stopping leader DataNode {} (region {}) for replica consistency test", + leaderDataNodeId, + targetRegionId); + + // 4. Stop the leader DataNode + leaderNode.stopForcibly(); + Assert.assertFalse("Leader should be stopped", leaderNode.isAlive()); + + // 5. Wait for follower to be elected as new leader and query - verify same data Query the + // exact follower instead of picking randomly from available nodes. This ensures we verify the + // former follower has replicated data correctly. + DataNodeWrapper followerNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(followerDataNodeId) + .orElseThrow(() -> new AssertionError("Follower DataNode not found in cluster")); + LOGGER.info( + "Waiting for follower DataNode {} to be elected as new leader and verifying replica consistency...", + followerDataNodeId); + Awaitility.await() + .pollDelay(2, TimeUnit.SECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + try (Connection followerConn = + makeItCloseQuietly( + EnvFactory.getEnv() + .getConnection( + followerNode, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT)); + Statement followerStmt = makeItCloseQuietly(followerConn.createStatement())) { + verifyDataConsistency(followerStmt); + } + }); + + LOGGER.info( + "Replica consistency verified: follower has same data as former leader after failover"); + } + } + + private void verifyDataConsistency(Statement statement) throws Exception { + LOGGER.info("Querying data to verify write success..."); + try (ResultSet countResult = statement.executeQuery(COUNT_QUERY)) { + Assert.assertTrue("Count query should return results", countResult.next()); + + int columnCount = countResult.getMetaData().getColumnCount(); + long totalCount = 0; + for (int i = 1; i <= columnCount; i++) { + totalCount += parseLongFromString(countResult.getString(i)); + } + Assert.assertEquals( + "Expected 6 total data points (3 timestamps x 2 measurements)", 6, totalCount); + } + + int rowCount = 0; + try (ResultSet selectResult = statement.executeQuery(SELECT_ALL_QUERY)) { + while (selectResult.next()) { + rowCount++; + // Use getString() for IoTDB JDBC compatibility (may return "1" or "1.0") + long timestamp = parseLongFromString(selectResult.getString(1)); + long speed = parseLongFromString(selectResult.getString(2)); + long temperature = parseLongFromString(selectResult.getString(3)); + if (timestamp == 100) { + Assert.assertEquals(1, speed); + Assert.assertEquals(2, temperature); + } else if (timestamp == 101) { + Assert.assertEquals(3, speed); + Assert.assertEquals(4, temperature); + } else if (timestamp == 102) { + Assert.assertEquals(5, speed); + Assert.assertEquals(6, temperature); + } + } + } + Assert.assertEquals("Expected 3 rows from select *", 3, rowCount); + } + + /** Parse long from IoTDB result string (handles both "1" and "1.0" formats). */ + private static long parseLongFromString(String s) { + if (s == null || s.isEmpty()) { + return 0; + } + try { + return Long.parseLong(s); + } catch (NumberFormatException e) { + return (long) Double.parseDouble(s); + } + } +} From ad1299d5a3f9788d2b21547debad78e50ced1948 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 4 Mar 2026 22:51:07 +0800 Subject: [PATCH 2/4] add batch and stream --- ...> IoTDBIoTConsensusV23C3DBasicITBase.java} | 63 +++++++++---------- .../IoTDBIoTConsensusV2Batch3C3DBasicIT.java | 32 ++++++++++ .../IoTDBIoTConsensusV2Stream3C3DBasicIT.java | 32 ++++++++++ 3 files changed, 93 insertions(+), 34 deletions(-) rename integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/{IoTDBIoTConsensusV23C3DBasicIT.java => IoTDBIoTConsensusV23C3DBasicITBase.java} (79%) create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java similarity index 79% rename from integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index ab8f5266fa4e..7c26f14d4e85 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -48,38 +48,45 @@ import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; /** - * Basic 3C3D (3 ConfigNode, 3 DataNode) integration test for IoTConsensusV2. + * Abstract base for IoTConsensusV2 3C3D integration tests. Subclasses specify batch or stream mode. * - *

This test verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write - * data 3. Execute flush on cluster 4. Query and verify data was written successfully + *

Verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write data 3. + * Execute flush on cluster 4. Query and verify data was written successfully * *

Additionally tests replica consistency: after stopping the leader DataNode, the follower * should be elected as new leader and serve the same data. */ @Category({ClusterIT.class}) @RunWith(IoTDBTestRunner.class) -public class IoTDBIoTConsensusV23C3DBasicIT extends IoTDBRegionOperationReliabilityITFramework { +public abstract class IoTDBIoTConsensusV23C3DBasicITBase + extends IoTDBRegionOperationReliabilityITFramework { private static final Logger LOGGER = - LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicIT.class); + LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicITBase.class); - private static final int CONFIG_NODE_NUM = 3; - private static final int DATA_NODE_NUM = 3; - private static final int DATA_REPLICATION_FACTOR = 2; - private static final int SCHEMA_REPLICATION_FACTOR = 3; + protected static final int CONFIG_NODE_NUM = 3; + protected static final int DATA_NODE_NUM = 3; + protected static final int DATA_REPLICATION_FACTOR = 2; + protected static final int SCHEMA_REPLICATION_FACTOR = 3; /** Timeout in seconds for 3C3D cluster init. */ - private static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; + protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; - private static final String INSERTION1 = + protected static final String INSERTION1 = "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; - private static final String INSERTION2 = + protected static final String INSERTION2 = "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)"; - private static final String INSERTION3 = + protected static final String INSERTION3 = "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)"; - private static final String FLUSH_COMMAND = "flush on cluster"; - private static final String COUNT_QUERY = "select count(*) from root.sg.**"; - private static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; + protected static final String FLUSH_COMMAND = "flush on cluster"; + protected static final String COUNT_QUERY = "select count(*) from root.sg.**"; + protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; + + /** + * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link + * ConsensusFactory#IOT_CONSENSUS_V2_STREAM_MODE}. + */ + protected abstract String getIoTConsensusV2Mode(); @Override @Before @@ -90,7 +97,7 @@ public void setUp() throws Exception { .getCommonConfig() .setDataReplicationFactor(DATA_REPLICATION_FACTOR) .setSchemaReplicationFactor(SCHEMA_REPLICATION_FACTOR) - .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE); + .setIoTConsensusV2Mode(getIoTConsensusV2Mode()); EnvFactory.getEnv() .initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS); @@ -101,20 +108,17 @@ public void test3C3DWriteFlushAndQuery() throws Exception { try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); Statement statement = makeItCloseQuietly(connection.createStatement())) { - // Write data - LOGGER.info("Writing data to 3C3D cluster..."); + LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode()); statement.execute(INSERTION1); statement.execute(INSERTION2); statement.execute(INSERTION3); - // Flush on cluster LOGGER.info("Executing flush on cluster..."); statement.execute(FLUSH_COMMAND); - // Query and verify data was written successfully verifyDataConsistency(statement); - LOGGER.info("3C3D IoTConsensusV2 basic test passed"); + LOGGER.info("3C3D IoTConsensusV2 {} basic test passed", getIoTConsensusV2Mode()); } } @@ -127,24 +131,20 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); Statement statement = makeItCloseQuietly(connection.createStatement())) { - // 1. Write data and flush - LOGGER.info("Writing data to 3C3D cluster..."); + LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode()); statement.execute(INSERTION1); statement.execute(INSERTION2); statement.execute(INSERTION3); statement.execute(FLUSH_COMMAND); - // 2. Query to verify initial write verifyDataConsistency(statement); LOGGER.info("Sleeping 2 seconds to wait replicate ..."); Thread.sleep(1000 * 2); - // 3. Get data region distribution and find the leader for root.sg Map>> dataRegionMap = getDataRegionMapWithLeader(statement); - // Find a data region that has our data (root.sg) - typically region 3 for first user db int targetRegionId = -1; int leaderDataNodeId = -1; int followerDataNodeId = -1; @@ -178,13 +178,9 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { leaderDataNodeId, targetRegionId); - // 4. Stop the leader DataNode leaderNode.stopForcibly(); Assert.assertFalse("Leader should be stopped", leaderNode.isAlive()); - // 5. Wait for follower to be elected as new leader and query - verify same data Query the - // exact follower instead of picking randomly from available nodes. This ensures we verify the - // former follower has replicated data correctly. DataNodeWrapper followerNode = EnvFactory.getEnv() .dataNodeIdToWrapper(followerDataNodeId) @@ -215,7 +211,7 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { } } - private void verifyDataConsistency(Statement statement) throws Exception { + protected void verifyDataConsistency(Statement statement) throws Exception { LOGGER.info("Querying data to verify write success..."); try (ResultSet countResult = statement.executeQuery(COUNT_QUERY)) { Assert.assertTrue("Count query should return results", countResult.next()); @@ -233,7 +229,6 @@ private void verifyDataConsistency(Statement statement) throws Exception { try (ResultSet selectResult = statement.executeQuery(SELECT_ALL_QUERY)) { while (selectResult.next()) { rowCount++; - // Use getString() for IoTDB JDBC compatibility (may return "1" or "1.0") long timestamp = parseLongFromString(selectResult.getString(1)); long speed = parseLongFromString(selectResult.getString(2)); long temperature = parseLongFromString(selectResult.getString(3)); @@ -253,7 +248,7 @@ private void verifyDataConsistency(Statement statement) throws Exception { } /** Parse long from IoTDB result string (handles both "1" and "1.0" formats). */ - private static long parseLongFromString(String s) { + protected static long parseLongFromString(String s) { if (s == null || s.isEmpty()) { return 0; } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java new file mode 100644 index 000000000000..d14396ae7d09 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2.batch; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; + +/** IoTConsensusV2 3C3D integration test with batch mode. */ +public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { + + @Override + protected String getIoTConsensusV2Mode() { + return ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java new file mode 100644 index 000000000000..40798a28cc0d --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2.stream; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; + +/** IoTConsensusV2 3C3D integration test with stream mode. */ +public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { + + @Override + protected String getIoTConsensusV2Mode() { + return ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE; + } +} From 897b4d89e27b055a54dee5edf88a17e422b09b12 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 4 Mar 2026 22:55:21 +0800 Subject: [PATCH 3/4] refactor to DailyIT --- .../it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index 7c26f14d4e85..97c6b15ae463 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -26,6 +26,7 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.tsfile.utils.Pair; @@ -56,7 +57,7 @@ *

Additionally tests replica consistency: after stopping the leader DataNode, the follower * should be elected as new leader and serve the same data. */ -@Category({ClusterIT.class}) +@Category({DailyIT.class}) @RunWith(IoTDBTestRunner.class) public abstract class IoTDBIoTConsensusV23C3DBasicITBase extends IoTDBRegionOperationReliabilityITFramework { From 5c809ba54275d5611a6386a16c11fcadfcd7a6ab Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 5 Mar 2026 09:51:19 +0800 Subject: [PATCH 4/4] refactor @Test --- .../IoTDBIoTConsensusV23C3DBasicITBase.java | 10 ---------- .../IoTDBIoTConsensusV2Batch3C3DBasicIT.java | 20 +++++++++++++++++++ .../IoTDBIoTConsensusV2Stream3C3DBasicIT.java | 20 +++++++++++++++++++ 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index 97c6b15ae463..78ea84991202 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -24,18 +24,12 @@ import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.ClusterIT; -import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.tsfile.utils.Pair; import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +51,6 @@ *

Additionally tests replica consistency: after stopping the leader DataNode, the follower * should be elected as new leader and serve the same data. */ -@Category({DailyIT.class}) -@RunWith(IoTDBTestRunner.class) public abstract class IoTDBIoTConsensusV23C3DBasicITBase extends IoTDBRegionOperationReliabilityITFramework { @@ -104,7 +96,6 @@ public void setUp() throws Exception { .initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS); } - @Test public void test3C3DWriteFlushAndQuery() throws Exception { try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); Statement statement = makeItCloseQuietly(connection.createStatement())) { @@ -127,7 +118,6 @@ public void test3C3DWriteFlushAndQuery() throws Exception { * Test replica consistency: with replication factor 2, stop the leader DataNode and verify the * follower serves the same data. */ - @Test public void testReplicaConsistencyAfterLeaderStop() throws Exception { try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); Statement statement = makeItCloseQuietly(connection.createStatement())) { diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java index d14396ae7d09..f71462fa470a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java @@ -21,12 +21,32 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; /** IoTConsensusV2 3C3D integration test with batch mode. */ +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { @Override protected String getIoTConsensusV2Mode() { return ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; } + + @Override + @Test + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + super.testReplicaConsistencyAfterLeaderStop(); + } + + @Override + @Test + public void test3C3DWriteFlushAndQuery() throws Exception { + super.test3C3DWriteFlushAndQuery(); + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java index 40798a28cc0d..856d3624bf18 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java @@ -21,12 +21,32 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; /** IoTConsensusV2 3C3D integration test with stream mode. */ +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { @Override protected String getIoTConsensusV2Mode() { return ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE; } + + @Override + @Test + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + super.testReplicaConsistencyAfterLeaderStop(); + } + + @Override + @Test + public void test3C3DWriteFlushAndQuery() throws Exception { + super.test3C3DWriteFlushAndQuery(); + } }