diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java new file mode 100644 index 000000000000..78ea84991202 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2; + +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +/** + * Abstract base for IoTConsensusV2 3C3D integration tests. Subclasses specify batch or stream mode. + * + *

Verifies that a 3C3D cluster with IoTConsensusV2 can: 1. Start successfully 2. Write data 3. + * Execute flush on cluster 4. Query and verify data was written successfully + * + *

Additionally tests replica consistency: after stopping the leader DataNode, the follower + * should be elected as new leader and serve the same data. + */ +public abstract class IoTDBIoTConsensusV23C3DBasicITBase + extends IoTDBRegionOperationReliabilityITFramework { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBIoTConsensusV23C3DBasicITBase.class); + + protected static final int CONFIG_NODE_NUM = 3; + protected static final int DATA_NODE_NUM = 3; + protected static final int DATA_REPLICATION_FACTOR = 2; + protected static final int SCHEMA_REPLICATION_FACTOR = 3; + + /** Timeout in seconds for 3C3D cluster init. */ + protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; + + protected static final String INSERTION1 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; + protected static final String INSERTION2 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)"; + protected static final String INSERTION3 = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)"; + protected static final String FLUSH_COMMAND = "flush on cluster"; + protected static final String COUNT_QUERY = "select count(*) from root.sg.**"; + protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; + + /** + * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link + * ConsensusFactory#IOT_CONSENSUS_V2_STREAM_MODE}. + */ + protected abstract String getIoTConsensusV2Mode(); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(DATA_REPLICATION_FACTOR) + .setSchemaReplicationFactor(SCHEMA_REPLICATION_FACTOR) + .setIoTConsensusV2Mode(getIoTConsensusV2Mode()); + + EnvFactory.getEnv() + .initClusterEnvironment(CONFIG_NODE_NUM, DATA_NODE_NUM, CLUSTER_INIT_TIMEOUT_SECONDS); + } + + public void test3C3DWriteFlushAndQuery() throws Exception { + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode()); + statement.execute(INSERTION1); + statement.execute(INSERTION2); + statement.execute(INSERTION3); + + LOGGER.info("Executing flush on cluster..."); + statement.execute(FLUSH_COMMAND); + + verifyDataConsistency(statement); + + LOGGER.info("3C3D IoTConsensusV2 {} basic test passed", getIoTConsensusV2Mode()); + } + } + + /** + * Test replica consistency: with replication factor 2, stop the leader DataNode and verify the + * follower serves the same data. + */ + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + LOGGER.info("Writing data to 3C3D cluster (mode: {})...", getIoTConsensusV2Mode()); + statement.execute(INSERTION1); + statement.execute(INSERTION2); + statement.execute(INSERTION3); + statement.execute(FLUSH_COMMAND); + + verifyDataConsistency(statement); + + LOGGER.info("Sleeping 2 seconds to wait replicate ..."); + Thread.sleep(1000 * 2); + + Map>> dataRegionMap = + getDataRegionMapWithLeader(statement); + + int targetRegionId = -1; + int leaderDataNodeId = -1; + int followerDataNodeId = -1; + for (Map.Entry>> entry : dataRegionMap.entrySet()) { + Pair> leaderAndReplicas = entry.getValue(); + if (leaderAndReplicas.getRight().size() > 1 + && leaderAndReplicas.getRight().size() <= DATA_REPLICATION_FACTOR + && leaderAndReplicas.getLeft() > 0) { + targetRegionId = entry.getKey(); + leaderDataNodeId = leaderAndReplicas.getLeft(); + final int lambdaLeaderDataNodeId = leaderDataNodeId; + followerDataNodeId = + leaderAndReplicas.getRight().stream() + .filter(i -> i != lambdaLeaderDataNodeId) + .findAny() + .orElse(-1); + break; + } + } + + Assert.assertTrue( + "Should find a data region with leader for root.sg", + targetRegionId > 0 && leaderDataNodeId > 0 && followerDataNodeId > 0); + + DataNodeWrapper leaderNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(leaderDataNodeId) + .orElseThrow(() -> new AssertionError("DataNode not found in cluster")); + LOGGER.info( + "Stopping leader DataNode {} (region {}) for replica consistency test", + leaderDataNodeId, + targetRegionId); + + leaderNode.stopForcibly(); + Assert.assertFalse("Leader should be stopped", leaderNode.isAlive()); + + DataNodeWrapper followerNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(followerDataNodeId) + .orElseThrow(() -> new AssertionError("Follower DataNode not found in cluster")); + LOGGER.info( + "Waiting for follower DataNode {} to be elected as new leader and verifying replica consistency...", + followerDataNodeId); + Awaitility.await() + .pollDelay(2, TimeUnit.SECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + try (Connection followerConn = + makeItCloseQuietly( + EnvFactory.getEnv() + .getConnection( + followerNode, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT)); + Statement followerStmt = makeItCloseQuietly(followerConn.createStatement())) { + verifyDataConsistency(followerStmt); + } + }); + + LOGGER.info( + "Replica consistency verified: follower has same data as former leader after failover"); + } + } + + protected void verifyDataConsistency(Statement statement) throws Exception { + LOGGER.info("Querying data to verify write success..."); + try (ResultSet countResult = statement.executeQuery(COUNT_QUERY)) { + Assert.assertTrue("Count query should return results", countResult.next()); + + int columnCount = countResult.getMetaData().getColumnCount(); + long totalCount = 0; + for (int i = 1; i <= columnCount; i++) { + totalCount += parseLongFromString(countResult.getString(i)); + } + Assert.assertEquals( + "Expected 6 total data points (3 timestamps x 2 measurements)", 6, totalCount); + } + + int rowCount = 0; + try (ResultSet selectResult = statement.executeQuery(SELECT_ALL_QUERY)) { + while (selectResult.next()) { + rowCount++; + long timestamp = parseLongFromString(selectResult.getString(1)); + long speed = parseLongFromString(selectResult.getString(2)); + long temperature = parseLongFromString(selectResult.getString(3)); + if (timestamp == 100) { + Assert.assertEquals(1, speed); + Assert.assertEquals(2, temperature); + } else if (timestamp == 101) { + Assert.assertEquals(3, speed); + Assert.assertEquals(4, temperature); + } else if (timestamp == 102) { + Assert.assertEquals(5, speed); + Assert.assertEquals(6, temperature); + } + } + } + Assert.assertEquals("Expected 3 rows from select *", 3, rowCount); + } + + /** Parse long from IoTDB result string (handles both "1" and "1.0" formats). */ + protected static long parseLongFromString(String s) { + if (s == null || s.isEmpty()) { + return 0; + } + try { + return Long.parseLong(s); + } catch (NumberFormatException e) { + return (long) Double.parseDouble(s); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java new file mode 100644 index 000000000000..f71462fa470a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2.batch; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +/** IoTConsensusV2 3C3D integration test with batch mode. */ +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { + + @Override + protected String getIoTConsensusV2Mode() { + return ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + } + + @Override + @Test + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + super.testReplicaConsistencyAfterLeaderStop(); + } + + @Override + @Test + public void test3C3DWriteFlushAndQuery() throws Exception { + super.test3C3DWriteFlushAndQuery(); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java new file mode 100644 index 000000000000..856d3624bf18 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.iotconsensusv2.stream; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.iotconsensusv2.IoTDBIoTConsensusV23C3DBasicITBase; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +/** IoTConsensusV2 3C3D integration test with stream mode. */ +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends IoTDBIoTConsensusV23C3DBasicITBase { + + @Override + protected String getIoTConsensusV2Mode() { + return ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE; + } + + @Override + @Test + public void testReplicaConsistencyAfterLeaderStop() throws Exception { + super.testReplicaConsistencyAfterLeaderStop(); + } + + @Override + @Test + public void test3C3DWriteFlushAndQuery() throws Exception { + super.test3C3DWriteFlushAndQuery(); + } +}