From 00d64d6f37b0864e1b8e8822f6eb7da584c65752 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 4 Mar 2026 22:44:47 +0800 Subject: [PATCH 1/4] add basic it --- .../IoTDBIoTConsensusV23C3DBasicIT.java | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java new file mode 100644 index 000000000000..ab8f5266fa4e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicIT.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2; + +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +/** + * Basic 3C3D (3 ConfigNode, 3 DataNode) integration test for IoTConsensusV2. + * + *
This test verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write + * data 3. Execute flush on cluster 4. Query and verify data was written successfully + * + *
Additionally tests replica consistency: after stopping the leader DataNode, the follower
+ * should be elected as new leader and serve the same data.
+ */
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBIoTConsensusV23C3DBasicIT extends IoTDBRegionOperationReliabilityITFramework {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicIT.class);
+
+ private static final int CONFIG_NODE_NUM = 3;
+ private static final int DATA_NODE_NUM = 3;
+ private static final int DATA_REPLICATION_FACTOR = 2;
+ private static final int SCHEMA_REPLICATION_FACTOR = 3;
+
+ /** Timeout in seconds for 3C3D cluster init. */
+ private static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
+
+ private static final String INSERTION1 =
+ "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
+ private static final String INSERTION2 =
+ "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
+ private static final String INSERTION3 =
+ "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
+ private static final String FLUSH_COMMAND = "flush on cluster";
+ private static final String COUNT_QUERY = "select count(*) from root.sg.**";
+ private static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(DATA_REPLICATION_FACTOR)
+ .setSchemaReplicationFactor(SCHEMA_REPLICATION_FACTOR)
+ .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE);
+
+ EnvFactory.getEnv()
+ .initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS);
+ }
+
+ @Test
+ public void test3C3DWriteFlushAndQuery() throws Exception {
+ try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement = makeItCloseQuietly(connection.createStatement())) {
+
+ // Write data
+ LOGGER.info("Writing data to 3C3D cluster...");
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(INSERTION3);
+
+ // Flush on cluster
+ LOGGER.info("Executing flush on cluster...");
+ statement.execute(FLUSH_COMMAND);
+
+ // Query and verify data was written successfully
+ verifyDataConsistency(statement);
+
+ LOGGER.info("3C3D IoTConsensusV2 basic test passed");
+ }
+ }
+
+ /**
+ * Test replica consistency: with replication factor 2, stop the leader DataNode and verify the
+ * follower serves the same data.
+ */
+ @Test
+ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
+ try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement = makeItCloseQuietly(connection.createStatement())) {
+
+ // 1. Write data and flush
+ LOGGER.info("Writing data to 3C3D cluster...");
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(INSERTION3);
+ statement.execute(FLUSH_COMMAND);
+
+ // 2. Query to verify initial write
+ verifyDataConsistency(statement);
+
+ LOGGER.info("Sleeping 2 seconds to wait replicate ...");
+ Thread.sleep(1000 * 2);
+
+ // 3. Get data region distribution and find the leader for root.sg
+ Map This test verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write
- * data 3. Execute flush on cluster 4. Query and verify data was written successfully
+ * Verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write data 3.
+ * Execute flush on cluster 4. Query and verify data was written successfully
*
* Additionally tests replica consistency: after stopping the leader DataNode, the follower
* should be elected as new leader and serve the same data.
*/
@Category({ClusterIT.class})
@RunWith(IoTDBTestRunner.class)
-public class IoTDBIoTConsensusV23C3DBasicIT extends IoTDBRegionOperationReliabilityITFramework {
+public abstract class IoTDBIoTConsensusV23C3DBasicITBase
+ extends IoTDBRegionOperationReliabilityITFramework {
private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicIT.class);
+ LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicITBase.class);
- private static final int CONFIG_NODE_NUM = 3;
- private static final int DATA_NODE_NUM = 3;
- private static final int DATA_REPLICATION_FACTOR = 2;
- private static final int SCHEMA_REPLICATION_FACTOR = 3;
+ protected static final int CONFIG_NODE_NUM = 3;
+ protected static final int DATA_NODE_NUM = 3;
+ protected static final int DATA_REPLICATION_FACTOR = 2;
+ protected static final int SCHEMA_REPLICATION_FACTOR = 3;
/** Timeout in seconds for 3C3D cluster init. */
- private static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
+ protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
- private static final String INSERTION1 =
+ protected static final String INSERTION1 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
- private static final String INSERTION2 =
+ protected static final String INSERTION2 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
- private static final String INSERTION3 =
+ protected static final String INSERTION3 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
- private static final String FLUSH_COMMAND = "flush on cluster";
- private static final String COUNT_QUERY = "select count(*) from root.sg.**";
- private static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1";
+ protected static final String FLUSH_COMMAND = "flush on cluster";
+ protected static final String COUNT_QUERY = "select count(*) from root.sg.**";
+ protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1";
+
+ /**
+ * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
+ * ConsensusFactory#IOT_CONSENSUS_V2_STREAM_MODE}.
+ */
+ protected abstract String getIoTConsensusV2Mode();
@Override
@Before
@@ -90,7 +97,7 @@ public void setUp() throws Exception {
.getCommonConfig()
.setDataReplicationFactor(DATA_REPLICATION_FACTOR)
.setSchemaReplicationFactor(SCHEMA_REPLICATION_FACTOR)
- .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE);
+ .setIoTConsensusV2Mode(getIoTConsensusV2Mode());
EnvFactory.getEnv()
.initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS);
@@ -101,20 +108,17 @@ public void test3C3DWriteFlushAndQuery() throws Exception {
try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
Statement statement = makeItCloseQuietly(connection.createStatement())) {
- // Write data
- LOGGER.info("Writing data to 3C3D cluster...");
+ LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode());
statement.execute(INSERTION1);
statement.execute(INSERTION2);
statement.execute(INSERTION3);
- // Flush on cluster
LOGGER.info("Executing flush on cluster...");
statement.execute(FLUSH_COMMAND);
- // Query and verify data was written successfully
verifyDataConsistency(statement);
- LOGGER.info("3C3D IoTConsensusV2 basic test passed");
+ LOGGER.info("3C3D IoTConsensusV2 {} basic test passed", getIoTConsensusV2Mode());
}
}
@@ -127,24 +131,20 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
Statement statement = makeItCloseQuietly(connection.createStatement())) {
- // 1. Write data and flush
- LOGGER.info("Writing data to 3C3D cluster...");
+ LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode());
statement.execute(INSERTION1);
statement.execute(INSERTION2);
statement.execute(INSERTION3);
statement.execute(FLUSH_COMMAND);
- // 2. Query to verify initial write
verifyDataConsistency(statement);
LOGGER.info("Sleeping 2 seconds to wait replicate ...");
Thread.sleep(1000 * 2);
- // 3. Get data region distribution and find the leader for root.sg
Map Additionally tests replica consistency: after stopping the leader DataNode, the follower
* should be elected as new leader and serve the same data.
*/
-@Category({ClusterIT.class})
+@Category({DailyIT.class})
@RunWith(IoTDBTestRunner.class)
public abstract class IoTDBIoTConsensusV23C3DBasicITBase
extends IoTDBRegionOperationReliabilityITFramework {
From 5c809ba54275d5611a6386a16c11fcadfcd7a6ab Mon Sep 17 00:00:00 2001
From: Peng Junzhi <201250214@smail.nju.edu.cn>
Date: Thu, 5 Mar 2026 09:51:19 +0800
Subject: [PATCH 4/4] refactor @Test
---
.../IoTDBIoTConsensusV23C3DBasicITBase.java | 10 ----------
.../IoTDBIoTConsensusV2Batch3C3DBasicIT.java | 20 +++++++++++++++++++
.../IoTDBIoTConsensusV2Stream3C3DBasicIT.java | 20 +++++++++++++++++++
3 files changed, 40 insertions(+), 10 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
index 97c6b15ae463..78ea84991202 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
@@ -24,18 +24,12 @@
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.ClusterIT;
-import org.apache.iotdb.itbase.category.DailyIT;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.tsfile.utils.Pair;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,8 +51,6 @@
* Additionally tests replica consistency: after stopping the leader DataNode, the follower
* should be elected as new leader and serve the same data.
*/
-@Category({DailyIT.class})
-@RunWith(IoTDBTestRunner.class)
public abstract class IoTDBIoTConsensusV23C3DBasicITBase
extends IoTDBRegionOperationReliabilityITFramework {
@@ -104,7 +96,6 @@ public void setUp() throws Exception {
.initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS);
}
- @Test
public void test3C3DWriteFlushAndQuery() throws Exception {
try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
Statement statement = makeItCloseQuietly(connection.createStatement())) {
@@ -127,7 +118,6 @@ public void test3C3DWriteFlushAndQuery() throws Exception {
* Test replica consistency: with replication factor 2, stop the leader DataNode and verify the
* follower serves the same data.
*/
- @Test
public void testReplicaConsistencyAfterLeaderStop() throws Exception {
try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
Statement statement = makeItCloseQuietly(connection.createStatement())) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
index d14396ae7d09..f71462fa470a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
@@ -21,12 +21,32 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.DailyIT;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
/** IoTConsensusV2 3C3D integration test with batch mode. */
+@Category({DailyIT.class})
+@RunWith(IoTDBTestRunner.class)
public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase {
@Override
protected String getIoTConsensusV2Mode() {
return ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
}
+
+ @Override
+ @Test
+ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
+ super.testReplicaConsistencyAfterLeaderStop();
+ }
+
+ @Override
+ @Test
+ public void test3C3DWriteFlushAndQuery() throws Exception {
+ super.test3C3DWriteFlushAndQuery();
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
index 40798a28cc0d..856d3624bf18 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
@@ -21,12 +21,32 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.DailyIT;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
/** IoTConsensusV2 3C3D integration test with stream mode. */
+@Category({DailyIT.class})
+@RunWith(IoTDBTestRunner.class)
public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase {
@Override
protected String getIoTConsensusV2Mode() {
return ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE;
}
+
+ @Override
+ @Test
+ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
+ super.testReplicaConsistencyAfterLeaderStop();
+ }
+
+ @Override
+ @Test
+ public void test3C3DWriteFlushAndQuery() throws Exception {
+ super.test3C3DWriteFlushAndQuery();
+ }
}