diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java new file mode 100644 index 0000000000000..edfaa57a9f9de --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBChangePointIT { + private static final String DATABASE_NAME = "test"; + + private static final String[] setupSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + // Table with consecutive duplicate values for change-point detection + "CREATE TABLE cp_data (device STRING TAG, measurement INT64 FIELD)", + // d1: values 10,10,20,20,20,30 -> change points at rows with values 10,20,30 (last of + // each run) + "INSERT INTO cp_data VALUES (1, 'd1', 10)", + "INSERT INTO cp_data VALUES (2, 'd1', 10)", + "INSERT INTO cp_data VALUES (3, 'd1', 20)", + "INSERT INTO cp_data VALUES (4, 'd1', 20)", + "INSERT INTO cp_data VALUES (5, 'd1', 20)", + "INSERT INTO cp_data VALUES (6, 'd1', 30)", + // d2: values 100,200,200,300 -> change points at rows with values 100,200,300 + "INSERT INTO cp_data VALUES (1, 'd2', 100)", + "INSERT INTO cp_data VALUES (2, 'd2', 200)", + "INSERT INTO cp_data VALUES (3, 'd2', 200)", + "INSERT INTO cp_data VALUES (4, 'd2', 300)", + // Table for all-same-values test + "CREATE TABLE cp_same (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_same VALUES (1, 'd1', 42)", + "INSERT INTO cp_same VALUES (2, 'd1', 42)", + "INSERT INTO cp_same VALUES (3, 'd1', 42)", + // Table for all-different-values test + "CREATE TABLE cp_diff (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_diff VALUES (1, 'd1', 1)", + "INSERT INTO cp_diff VALUES (2, 'd1', 2)", + "INSERT INTO cp_diff VALUES (3, 'd1', 3)", + // Table for single row test + "CREATE TABLE cp_single (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_single VALUES (1, 'd1', 99)", + "FLUSH", + "CLEAR ATTRIBUTE CACHE", + }; + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024); + EnvFactory.getEnv().initClusterEnvironment(); + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : setupSqls) { + statement.execute(sql); + } + } catch (Exception e) { + e.printStackTrace(); + fail("setUp failed: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testChangePointDetection() { + // The change-point SQL pattern: + // SELECT * FROM (SELECT *, LEAD(measurement) OVER (...) AS next FROM t) WHERE measurement <> + // next OR next IS NULL + // This detects the last row of each consecutive run of identical values. + // + // d1 values: 10,10,20,20,20,30 + // row 1 (10): next=10, 10!=10 false -> not emitted + // row 2 (10): next=20, 10!=20 true -> emitted (last 10 before 20) + // row 3 (20): next=20, 20!=20 false -> not emitted + // row 4 (20): next=20, 20!=20 false -> not emitted + // row 5 (20): next=30, 20!=30 true -> emitted (last 20 before 30) + // row 6 (30): next=NULL -> emitted (last row) + // + // d2 values: 100,200,200,300 + // row 1 (100): next=200, true -> emitted + // row 2 (200): next=200, false -> not emitted + // row 3 (200): next=300, true -> emitted + // row 4 (300): next=NULL -> emitted + + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_data) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.002Z,d1,10,20,", + "1970-01-01T00:00:00.005Z,d1,20,30,", + "1970-01-01T00:00:00.006Z,d1,30,null,", + "1970-01-01T00:00:00.001Z,d2,100,200,", + "1970-01-01T00:00:00.003Z,d2,200,300,", + "1970-01-01T00:00:00.004Z,d2,300,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointAllSameValues() { + // All values are the same -> only the last row should be emitted (next IS NULL) + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_same) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.003Z,d1,42,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointAllDifferentValues() { + // All values are different -> every row should be emitted + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_diff) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,1,2,", + "1970-01-01T00:00:00.002Z,d1,2,3,", + "1970-01-01T00:00:00.003Z,d1,3,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointSingleRow() { + // Single row -> always emitted (next IS NULL) + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_single) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,99,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java new file mode 100644 index 0000000000000..8a037bbd8d24f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source; + +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MeasurementToTableViewAdaptorUtils; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.constructAlignedPath; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.MeasurementToTableViewAdaptorUtils.toTableBlock; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; + +/** + * ChangePointOperator performs "first of run" change-point detection pushed down to the table scan + * level, leveraging TsFile statistics (min == max) at file/chunk/page level to skip uniform + * segments and avoid unnecessary I/O. + * + *

For each consecutive run of identical values in the monitored column, only the first row is + * emitted, along with the next different value (or NULL at end of partition). This replaces a + * Filter(Window(LEAD(...))) pattern. + * + *

The output includes all table columns plus a "next" column. + */ +@SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) +public class ChangePointOperator extends AbstractDataSourceOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ChangePointOperator.class); + + private final List columnSchemas; + private final int[] columnsIndexArray; + private final List deviceEntries; + private final int deviceCount; + private final Ordering scanOrder; + private final SeriesScanOptions seriesScanOptions; + private final List measurementColumnNames; + private final Set allSensors; + private final List measurementSchemas; + private final List measurementColumnTSDataTypes; + + private final int monitoredMeasurementIndex; + private final TSDataType monitoredDataType; + private final boolean canUseStatistics; + private final MeasurementToTableViewAdaptorUtils.GetNthIdColumnValueFunc idColumnValueFunc; + + private int currentDeviceIndex; + private boolean finished = false; + + private boolean hasBufferedRow = false; + private long bufferedTime; + private Object[] bufferedMeasurementValues; + private Object cachedMonitoredValue; + + private TsBlockBuilder changePointBuilder; + private List nextValues; + + private QueryDataSource queryDataSource; + + public ChangePointOperator(ChangePointOperatorParameter parameter) { + this.sourceId = parameter.sourceId; + this.operatorContext = parameter.context; + this.columnSchemas = parameter.columnSchemas; + this.columnsIndexArray = parameter.columnsIndexArray; + this.deviceEntries = parameter.deviceEntries; + this.deviceCount = parameter.deviceEntries.size(); + this.scanOrder = parameter.scanOrder; + this.seriesScanOptions = parameter.seriesScanOptions; + this.measurementColumnNames = parameter.measurementColumnNames; + this.allSensors = parameter.allSensors; + this.measurementSchemas = parameter.measurementSchemas; + this.measurementColumnTSDataTypes = + parameter.measurementSchemas.stream() + .map(IMeasurementSchema::getType) + .collect(Collectors.toList()); + this.monitoredMeasurementIndex = parameter.monitoredMeasurementIndex; + this.monitoredDataType = measurementColumnTSDataTypes.get(monitoredMeasurementIndex); + this.canUseStatistics = measurementColumnNames.size() == 1; + this.idColumnValueFunc = parameter.idColumnValueFunc; + this.currentDeviceIndex = 0; + this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(deviceCount)); + this.maxReturnSize = + Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); + this.nextValues = new ArrayList<>(); + + constructAlignedSeriesScanUtil(); + } + + // ======================== Operator interface ======================== + + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + this.queryDataSource = (QueryDataSource) dataSource; + if (this.seriesScanUtil != null) { + this.seriesScanUtil.initQueryDataSource(queryDataSource); + } + List changePointOutputTypes = new ArrayList<>(measurementColumnTSDataTypes); + changePointOutputTypes.add(monitoredDataType); + this.changePointBuilder = new TsBlockBuilder(changePointOutputTypes); + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + return getResultFromRetainedTsBlock(); + } + + try { + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + boolean currentDeviceNoMoreData = false; + + do { + if (readPageData()) { + continue; + } + Optional b = readChunkData(); + if (!b.isPresent() || b.get()) { + continue; + } + b = readFileData(); + if (!b.isPresent() || b.get()) { + continue; + } + currentDeviceNoMoreData = true; + break; + } while (System.nanoTime() - start < maxRuntime && !changePointBuilder.isFull()); + + if (currentDeviceNoMoreData) { + flushBufferedRow(); + currentDeviceIndex++; + prepareForNextDevice(); + } + } catch (IOException e) { + throw new RuntimeException("Error happened while scanning the file", e); + } + + if (changePointBuilder.isEmpty()) { + return null; + } + + TsBlock measurementBlock = changePointBuilder.build(); + changePointBuilder.reset(); + + int rowCount = measurementBlock.getPositionCount(); + int measurementCount = measurementColumnTSDataTypes.size(); + Column nextColumn = measurementBlock.getColumn(measurementCount); + + Column[] measurementCols = new Column[measurementCount]; + for (int i = 0; i < measurementCount; i++) { + measurementCols[i] = measurementBlock.getColumn(i); + } + TsBlock pureMeasurementBlock = + new TsBlock(rowCount, measurementBlock.getTimeColumn(), measurementCols); + + DeviceEntry currentDeviceEntry = deviceEntries.get(Math.max(0, currentDeviceIndex - 1)); + TsBlock tableBlock = + toTableBlock( + pureMeasurementBlock, + columnsIndexArray, + columnSchemas, + currentDeviceEntry, + idColumnIndex -> getNthIdColumnValue(currentDeviceEntry, idColumnIndex)); + + Column[] finalColumns = new Column[tableBlock.getValueColumnCount() + 1]; + for (int i = 0; i < tableBlock.getValueColumnCount(); i++) { + finalColumns[i] = tableBlock.getColumn(i); + } + finalColumns[tableBlock.getValueColumnCount()] = nextColumn; + + resultTsBlock = + new TsBlock( + rowCount, + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, rowCount), + finalColumns); + nextValues.clear(); + return checkTsBlockSizeAndGetResult(); + } + + private String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return ((String) deviceEntry.getNthSegment(idColumnIndex + 1)); + } + + @Override + public boolean hasNext() throws Exception { + return !isFinished(); + } + + @Override + public boolean isFinished() throws Exception { + if (!finished) { + finished = + (retainedTsBlock == null) + && currentDeviceIndex >= deviceCount + && changePointBuilder.isEmpty(); + } + return finished; + } + + @Override + public void close() throws Exception { + // no additional resources + } + + @Override + public long calculateMaxPeekMemory() { + return Math.max(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) + + (changePointBuilder == null ? 0 : changePointBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries); + } + + @Override + protected List getResultDataTypes() { + List result = new ArrayList<>(); + for (ColumnSchema schema : columnSchemas) { + result.add( + org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType( + schema.getType())); + } + result.add(monitoredDataType); + return result; + } + + // ======================== Device iteration ======================== + + private void constructAlignedSeriesScanUtil() { + DeviceEntry deviceEntry; + if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) { + deviceEntry = new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new Binary[0]); + } else { + deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); + } + + AlignedFullPath alignedPath = + constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas, allSensors); + this.seriesScanUtil = + new AlignedSeriesScanUtil( + alignedPath, + scanOrder, + seriesScanOptions, + operatorContext.getInstanceContext(), + true, + measurementColumnTSDataTypes); + } + + private void prepareForNextDevice() { + hasBufferedRow = false; + cachedMonitoredValue = null; + if (currentDeviceIndex < deviceCount) { + constructAlignedSeriesScanUtil(); + if (queryDataSource != null) { + seriesScanUtil.initQueryDataSource(queryDataSource); + } + } + } + + // ======================== Hierarchical scanning with statistics ======================== + + private Optional readFileData() throws IOException { + Optional b = seriesScanUtil.hasNextFile(); + if (!b.isPresent() || !b.get()) { + return b; + } + + if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) { + Statistics fileStats = seriesScanUtil.currentFileStatistics(monitoredMeasurementIndex); + if (fileStats != null && isUniformSegment(fileStats)) { + handleUniformSegment( + seriesScanUtil.currentFileTimeStatistics().getStartTime(), + seriesScanUtil.currentFileTimeStatistics().getEndTime(), + fileStats); + seriesScanUtil.skipCurrentFile(); + return Optional.of(true); + } + } + + b = readChunkData(); + if (!b.isPresent() || b.get()) { + return b; + } + return Optional.empty(); + } + + private Optional readChunkData() throws IOException { + Optional b = seriesScanUtil.hasNextChunk(); + if (!b.isPresent() || !b.get()) { + return b; + } + + if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) { + Statistics chunkStats = seriesScanUtil.currentChunkStatistics(monitoredMeasurementIndex); + if (chunkStats != null && isUniformSegment(chunkStats)) { + handleUniformSegment( + seriesScanUtil.currentChunkTimeStatistics().getStartTime(), + seriesScanUtil.currentChunkTimeStatistics().getEndTime(), + chunkStats); + seriesScanUtil.skipCurrentChunk(); + return Optional.of(true); + } + } + + if (readPageData()) { + return Optional.of(true); + } + return Optional.empty(); + } + + private boolean readPageData() throws IOException { + if (!seriesScanUtil.hasNextPage()) { + return false; + } + + if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) { + Statistics pageStats = seriesScanUtil.currentPageStatistics(monitoredMeasurementIndex); + if (pageStats != null && isUniformSegment(pageStats)) { + handleUniformSegment( + seriesScanUtil.currentPageTimeStatistics().getStartTime(), + seriesScanUtil.currentPageTimeStatistics().getEndTime(), + pageStats); + seriesScanUtil.skipCurrentPage(); + return true; + } + } + + TsBlock tsBlock = seriesScanUtil.nextPage(); + if (tsBlock == null || tsBlock.isEmpty()) { + return true; + } + processRawData(tsBlock); + return true; + } + + // ======================== Statistics-based optimization ======================== + + private boolean isUniformSegment(Statistics statistics) { + return statistics.getMinValue().equals(statistics.getMaxValue()); + } + + /** + * Handles a uniform segment. For "first of run": if the uniform value differs from cached, emit + * the buffered row, then buffer the first row from this segment (reconstructed from statistics). + * If same as cached, do nothing — the first row of the run is already buffered. + */ + private void handleUniformSegment(long startTime, long endTime, Statistics statistics) { + Object uniformValue = statistics.getMinValue(); + + if (!hasBufferedRow) { + hasBufferedRow = true; + bufferedTime = startTime; + bufferedMeasurementValues = new Object[] {uniformValue}; + cachedMonitoredValue = uniformValue; + } else if (!valuesEqual(cachedMonitoredValue, uniformValue)) { + emitChangePointFromStatistics(uniformValue); + bufferedTime = startTime; + bufferedMeasurementValues = new Object[] {uniformValue}; + cachedMonitoredValue = uniformValue; + } + } + + private void emitChangePointFromStatistics(Object nextValue) { + int measurementCount = measurementColumnTSDataTypes.size(); + changePointBuilder.getTimeColumnBuilder().writeLong(bufferedTime); + writeValueToBuilder( + changePointBuilder.getColumnBuilder(monitoredMeasurementIndex), + monitoredDataType, + bufferedMeasurementValues[0]); + writeValueToBuilder( + changePointBuilder.getColumnBuilder(measurementCount), monitoredDataType, nextValue); + changePointBuilder.declarePosition(); + } + + // ======================== Raw data processing ======================== + + private void processRawData(TsBlock tsBlock) { + int size = tsBlock.getPositionCount(); + Column timeColumn = tsBlock.getTimeColumn(); + Column monitoredColumn = tsBlock.getColumn(monitoredMeasurementIndex); + int measurementCount = measurementColumnTSDataTypes.size(); + + for (int i = 0; i < size; i++) { + if (monitoredColumn.isNull(i)) { + continue; + } + + Object currentValue = getColumnValue(monitoredColumn, monitoredDataType, i); + + if (!hasBufferedRow) { + bufferRow(tsBlock, i, currentValue); + continue; + } + + if (!valuesEqual(cachedMonitoredValue, currentValue)) { + emitChangePointRow(tsBlock, i, currentValue); + bufferRow(tsBlock, i, currentValue); + } + } + } + + private void bufferRow(TsBlock tsBlock, int position, Object monitoredValue) { + hasBufferedRow = true; + bufferedTime = tsBlock.getTimeColumn().getLong(position); + bufferedMeasurementValues = new Object[measurementColumnTSDataTypes.size()]; + for (int col = 0; col < measurementColumnTSDataTypes.size(); col++) { + Column c = tsBlock.getColumn(col); + if (c.isNull(position)) { + bufferedMeasurementValues[col] = null; + } else { + bufferedMeasurementValues[col] = + getColumnValue(c, measurementColumnTSDataTypes.get(col), position); + } + } + cachedMonitoredValue = monitoredValue; + } + + private void emitChangePointRow(TsBlock tsBlock, int nextPosition, Object nextValue) { + int measurementCount = measurementColumnTSDataTypes.size(); + changePointBuilder.getTimeColumnBuilder().writeLong(bufferedTime); + + for (int col = 0; col < measurementCount; col++) { + ColumnBuilder builder = changePointBuilder.getColumnBuilder(col); + Object val = bufferedMeasurementValues[col]; + if (val == null) { + builder.appendNull(); + } else { + writeValueToBuilder(builder, measurementColumnTSDataTypes.get(col), val); + } + } + + ColumnBuilder nextBuilder = changePointBuilder.getColumnBuilder(measurementCount); + writeValueToBuilder(nextBuilder, monitoredDataType, nextValue); + changePointBuilder.declarePosition(); + } + + private void flushBufferedRow() { + if (!hasBufferedRow) { + return; + } + int measurementCount = measurementColumnTSDataTypes.size(); + changePointBuilder.getTimeColumnBuilder().writeLong(bufferedTime); + + for (int col = 0; col < measurementCount; col++) { + ColumnBuilder builder = changePointBuilder.getColumnBuilder(col); + Object val = bufferedMeasurementValues[col]; + if (val == null) { + builder.appendNull(); + } else { + writeValueToBuilder(builder, measurementColumnTSDataTypes.get(col), val); + } + } + + changePointBuilder.getColumnBuilder(measurementCount).appendNull(); + changePointBuilder.declarePosition(); + hasBufferedRow = false; + } + + // ======================== Value utilities ======================== + + private Object getColumnValue(Column column, TSDataType dataType, int position) { + switch (dataType) { + case BOOLEAN: + return column.getBoolean(position); + case INT32: + return column.getInt(position); + case INT64: + case TIMESTAMP: + return column.getLong(position); + case FLOAT: + return column.getFloat(position); + case DOUBLE: + return column.getDouble(position); + case TEXT: + case STRING: + case BLOB: + return column.getBinary(position); + default: + return null; + } + } + + private boolean valuesEqual(Object a, Object b) { + if (a == null || b == null) { + return a == b; + } + if (a instanceof Float && b instanceof Float) { + return Float.compare((Float) a, (Float) b) == 0; + } + if (a instanceof Double && b instanceof Double) { + return Double.compare((Double) a, (Double) b) == 0; + } + return a.equals(b); + } + + private void writeValueToBuilder(ColumnBuilder builder, TSDataType dataType, Object value) { + if (value == null) { + builder.appendNull(); + return; + } + switch (dataType) { + case BOOLEAN: + builder.writeBoolean((Boolean) value); + break; + case INT32: + builder.writeInt(((Number) value).intValue()); + break; + case INT64: + case TIMESTAMP: + builder.writeLong(((Number) value).longValue()); + break; + case FLOAT: + builder.writeFloat(((Number) value).floatValue()); + break; + case DOUBLE: + builder.writeDouble(((Number) value).doubleValue()); + break; + case TEXT: + case STRING: + case BLOB: + builder.writeBinary((Binary) value); + break; + default: + builder.appendNull(); + break; + } + } + + // ======================== Parameter class ======================== + + public static class ChangePointOperatorParameter { + public final OperatorContext context; + public final PlanNodeId sourceId; + public final List columnSchemas; + public final int[] columnsIndexArray; + public final List deviceEntries; + public final Ordering scanOrder; + public final SeriesScanOptions seriesScanOptions; + public final List measurementColumnNames; + public final Set allSensors; + public final List measurementSchemas; + public final int monitoredMeasurementIndex; + public final MeasurementToTableViewAdaptorUtils.GetNthIdColumnValueFunc idColumnValueFunc; + + public ChangePointOperatorParameter( + OperatorContext context, + PlanNodeId sourceId, + List columnSchemas, + int[] columnsIndexArray, + List deviceEntries, + Ordering scanOrder, + SeriesScanOptions seriesScanOptions, + List measurementColumnNames, + Set allSensors, + List measurementSchemas, + int monitoredMeasurementIndex, + MeasurementToTableViewAdaptorUtils.GetNthIdColumnValueFunc idColumnValueFunc) { + this.context = context; + this.sourceId = sourceId; + this.columnSchemas = columnSchemas; + this.columnsIndexArray = columnsIndexArray; + this.deviceEntries = deviceEntries; + this.scanOrder = scanOrder; + this.seriesScanOptions = seriesScanOptions; + this.measurementColumnNames = measurementColumnNames; + this.allSensors = allSensors; + this.measurementSchemas = measurementSchemas; + this.monitoredMeasurementIndex = monitoredMeasurementIndex; + this.idColumnValueFunc = idColumnValueFunc; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc788251a764..7081a23ff2033 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.PatternRecognitionOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.PreviousFillWithGroupOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.ChangePointOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TableFillOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TableIntoOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TableLinearFillOperator; @@ -186,6 +187,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; @@ -4306,6 +4309,81 @@ public Operator visitRowNumber(RowNumberNode node, LocalExecutionPlanContext con 10_000); } + @Override + public Operator visitChangePoint(ChangePointNode node, LocalExecutionPlanContext context) { + throw new UnsupportedOperationException( + "ChangePointNode should have been pushed into TableScan by PushChangePointIntoTableScan rule"); + } + + @Override + public Operator visitChangePointTableScan( + ChangePointTableScanNode node, LocalExecutionPlanContext context) { + + CommonTableScanOperatorParameters commonParameter = + new CommonTableScanOperatorParameters(node, Collections.emptyMap(), false); + + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + commonParameter.columnSchemaMap, + commonParameter.measurementColumnNames, + commonParameter.measurementColumnsIndexMap, + commonParameter.timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); + + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + ChangePointOperator.class.getSimpleName()); + + Set allSensors = new HashSet<>(commonParameter.measurementColumnNames); + allSensors.add(""); + + String monitoredColumnName = + node.getAssignments().get(node.getMeasurementSymbol()).getName(); + int monitoredMeasurementIndex = commonParameter.measurementColumnNames.indexOf(monitoredColumnName); + if (monitoredMeasurementIndex < 0) { + throw new IllegalStateException( + "Monitored column not found in measurement columns: " + monitoredColumnName); + } + + ChangePointOperator.ChangePointOperatorParameter parameter = + new ChangePointOperator.ChangePointOperatorParameter( + operatorContext, + node.getPlanNodeId(), + commonParameter.columnSchemas, + commonParameter.columnsIndexArray, + node.getDeviceEntries(), + node.getScanOrder(), + seriesScanOptions, + commonParameter.measurementColumnNames, + allSensors, + commonParameter.measurementSchemas, + monitoredMeasurementIndex, + idColumnIndex -> ((String) node.getDeviceEntries().get(0).getNthSegment(idColumnIndex + 1))); + + ChangePointOperator changePointOperator = new ChangePointOperator(parameter); + + context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); + addSource( + changePointOperator, + context, + node, + commonParameter.measurementColumnNames, + commonParameter.measurementSchemas, + allSensors, + ChangePointTableScanNode.class.getSimpleName()); + + return changePointOperator; + } + @Override public Operator visitTopKRanking(TopKRankingNode node, LocalExecutionPlanContext context) { Operator child = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 55aaefe8be618..45279c618da77 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -119,6 +119,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExceptNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; @@ -323,6 +324,8 @@ public enum PlanNodeType { TABLE_TOPK_RANKING_NODE((short) 1037), TABLE_ROW_NUMBER_NODE((short) 1038), TABLE_VALUES_NODE((short) 1039), + TABLE_CHANGE_POINT_NODE((short) 1040), + CHANGE_POINT_TABLE_SCAN_NODE((short) 1041), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -727,6 +730,11 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { return RowNumberNode.deserialize(buffer); case 1039: return ValuesNode.deserialize(buffer); + case 1040: + return ChangePointNode.deserialize(buffer); + case 1041: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node + .ChangePointTableScanNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 44f1cd8bc1f67..19db1d547061d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -123,6 +123,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExceptNode; @@ -799,6 +800,16 @@ public R visitRowNumber(RowNumberNode node, C context) { return visitSingleChildProcess(node, context); } + public R visitChangePoint(ChangePointNode node, C context) { + return visitSingleChildProcess(node, context); + } + + public R visitChangePointTableScan( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointTableScanNode node, + C context) { + return visitDeviceTableScan(node, context); + } + public R visitValuesNode(ValuesNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f73..253dd6b1fab51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -1877,6 +1878,21 @@ public List visitRowNumber(RowNumberNode node, PlanContext context) { } } + @Override + public List visitChangePoint(ChangePointNode node, PlanContext context) { + if (node.getChildren().isEmpty()) { + return Collections.singletonList(node); + } + + List childrenNodes = node.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } else { + return splitForEachChild(node, childrenNodes); + } + } + @Override public List visitTopKRanking(TopKRankingNode node, PlanContext context) { Optional orderingScheme = node.getSpecification().getOrderingScheme(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushChangePointIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushChangePointIntoTableScan.java new file mode 100644 index 0000000000000..940111d1b026e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushChangePointIntoTableScan.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.changePoint; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern.typeOf; + +/** + * Pushes a ChangePointNode into a DeviceTableScanNode, creating a ChangePointTableScanNode that + * can leverage TsFile statistics for optimization. + */ +public class PushChangePointIntoTableScan implements Rule { + + private static final Capture TABLE_SCAN_CAPTURE = newCapture(); + + private final Pattern pattern; + + public PushChangePointIntoTableScan() { + this.pattern = + changePoint() + .with( + source() + .matching( + typeOf(DeviceTableScanNode.class) + .matching( + scan -> !(scan instanceof ChangePointTableScanNode)) + .capturedAs(TABLE_SCAN_CAPTURE))); + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(ChangePointNode changePointNode, Captures captures, Context context) { + DeviceTableScanNode scanNode = captures.get(TABLE_SCAN_CAPTURE); + + PlanNode merged = + ChangePointTableScanNode.combineChangePointAndTableScan(changePointNode, scanNode); + return Result.ofPlanNode(merged); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java new file mode 100644 index 0000000000000..3f88c588fee6f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.IsNullPredicate; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +/** + * Replaces a Filter(Window(LEAD)) change-point detection pattern with a single ChangePointNode. + * + *

Matches the SQL pattern: SELECT * FROM (SELECT *, LEAD(col) OVER (PARTITION BY ... ORDER BY + * ...) AS next FROM t) WHERE col != next OR next IS NULL + */ +public class ReplaceFilterWindowLeadWithChangePoint implements Rule { + + private static final Capture WINDOW_CAPTURE = newCapture(); + + private final Pattern pattern; + + public ReplaceFilterWindowLeadWithChangePoint() { + this.pattern = + filter() + .with( + source() + .matching( + window() + .matching(ReplaceFilterWindowLeadWithChangePoint::isLeadWindow) + .capturedAs(WINDOW_CAPTURE))); + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(FilterNode filterNode, Captures captures, Context context) { + WindowNode windowNode = captures.get(WINDOW_CAPTURE); + + Map.Entry entry = + getOnlyElement(windowNode.getWindowFunctions().entrySet()); + Symbol nextSymbol = entry.getKey(); + WindowNode.Function function = entry.getValue(); + + List arguments = function.getArguments(); + if (arguments.isEmpty() || !(arguments.get(0) instanceof SymbolReference)) { + return Result.empty(); + } + + String measurementName = ((SymbolReference) arguments.get(0)).getName(); + Symbol measurementSymbol = new Symbol(measurementName); + + if (!isChangePointPredicate(filterNode.getPredicate(), measurementName, nextSymbol.getName())) { + return Result.empty(); + } + + return Result.ofPlanNode( + new ChangePointNode( + filterNode.getPlanNodeId(), windowNode.getChild(), measurementSymbol, nextSymbol)); + } + + private static boolean isLeadWindow(WindowNode window) { + if (window.getWindowFunctions().size() != 1) { + return false; + } + + WindowNode.Function function = getOnlyElement(window.getWindowFunctions().values()); + String functionName = function.getResolvedFunction().getSignature().getName(); + if (!"lead".equals(functionName)) { + return false; + } + + List arguments = function.getArguments(); + if (arguments.isEmpty()) { + return false; + } + + // LEAD with default offset (1 argument) or explicit offset=1 (2 arguments) + if (arguments.size() == 1) { + return arguments.get(0) instanceof SymbolReference; + } + if (arguments.size() == 2) { + if (!(arguments.get(0) instanceof SymbolReference)) { + return false; + } + Expression offsetExpr = arguments.get(1); + if (offsetExpr instanceof Literal) { + Object val = ((Literal) offsetExpr).getTsValue(); + return val instanceof Number && ((Number) val).longValue() == 1; + } + return false; + } + return false; + } + + /** + * Checks if the predicate matches: col != next OR next IS NULL, in either order of the OR terms. + */ + private static boolean isChangePointPredicate( + Expression predicate, String measurementName, String nextName) { + if (!(predicate instanceof LogicalExpression)) { + return false; + } + + LogicalExpression logical = (LogicalExpression) predicate; + if (logical.getOperator() != LogicalExpression.Operator.OR) { + return false; + } + + List terms = logical.getTerms(); + if (terms.size() != 2) { + return false; + } + + Expression first = terms.get(0); + Expression second = terms.get(1); + + return (isNotEqualComparison(first, measurementName, nextName) && isNullCheck(second, nextName)) + || (isNullCheck(first, nextName) + && isNotEqualComparison(second, measurementName, nextName)); + } + + private static boolean isNotEqualComparison( + Expression expr, String measurementName, String nextName) { + if (!(expr instanceof ComparisonExpression)) { + return false; + } + ComparisonExpression comparison = (ComparisonExpression) expr; + if (comparison.getOperator() != ComparisonExpression.Operator.NOT_EQUAL) { + return false; + } + + Expression left = comparison.getLeft(); + Expression right = comparison.getRight(); + return (isSymbolRef(left, measurementName) && isSymbolRef(right, nextName)) + || (isSymbolRef(left, nextName) && isSymbolRef(right, measurementName)); + } + + private static boolean isNullCheck(Expression expr, String symbolName) { + if (!(expr instanceof IsNullPredicate)) { + return false; + } + return isSymbolRef(((IsNullPredicate) expr).getValue(), symbolName); + } + + private static boolean isSymbolRef(Expression expr, String name) { + return expr instanceof SymbolReference && ((SymbolReference) expr).getName().equals(name); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java new file mode 100644 index 0000000000000..18e0d40652426 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.ImmutableList; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * ChangePointNode detects value changes in a monitored column within sorted, partitioned data. It + * emits the last row of each consecutive run of identical values (plus the final row), replacing a + * Filter(Window(LEAD(...))) pattern. + */ +public class ChangePointNode extends SingleChildProcessNode { + + private final Symbol measurementSymbol; + private final Symbol nextSymbol; + + public ChangePointNode( + PlanNodeId id, PlanNode child, Symbol measurementSymbol, Symbol nextSymbol) { + super(id, child); + this.measurementSymbol = measurementSymbol; + this.nextSymbol = nextSymbol; + } + + public ChangePointNode(PlanNodeId id, Symbol measurementSymbol, Symbol nextSymbol) { + super(id); + this.measurementSymbol = measurementSymbol; + this.nextSymbol = nextSymbol; + } + + public Symbol getMeasurementSymbol() { + return measurementSymbol; + } + + public Symbol getNextSymbol() { + return nextSymbol; + } + + @Override + public PlanNode clone() { + return new ChangePointNode(getPlanNodeId(), measurementSymbol, nextSymbol); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitChangePoint(this, context); + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + public List getOutputSymbols() { + return ImmutableList.builder() + .addAll(getChild().getOutputSymbols()) + .add(nextSymbol) + .build(); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + checkArgument(newChildren.size() == 1, "wrong number of new children"); + return new ChangePointNode(id, newChildren.get(0), measurementSymbol, nextSymbol); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(byteBuffer); + Symbol.serialize(measurementSymbol, byteBuffer); + Symbol.serialize(nextSymbol, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(stream); + Symbol.serialize(measurementSymbol, stream); + Symbol.serialize(nextSymbol, stream); + } + + public static ChangePointNode deserialize(ByteBuffer buffer) { + Symbol measurementSymbol = Symbol.deserialize(buffer); + Symbol nextSymbol = Symbol.deserialize(buffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + return new ChangePointNode(planNodeId, measurementSymbol, nextSymbol); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ChangePointNode that = (ChangePointNode) o; + return Objects.equals(measurementSymbol, that.measurementSymbol) + && Objects.equals(nextSymbol, that.nextSymbol); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), measurementSymbol, nextSymbol); + } + + @Override + public String toString() { + return "ChangePoint-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointTableScanNode.java new file mode 100644 index 0000000000000..803d49b6a4743 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointTableScanNode.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import com.google.common.collect.ImmutableList; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * ChangePointTableScanNode combines DeviceTableScanNode with change-point detection. It pushes + * change-point logic (last-of-run with next column) into the scan level so that TsFile statistics + * can be leveraged to skip uniform segments. + */ +public class ChangePointTableScanNode extends DeviceTableScanNode { + + private Symbol measurementSymbol; + private Symbol nextSymbol; + + public ChangePointTableScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + List deviceEntries, + Map tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Symbol measurementSymbol, + Symbol nextSymbol) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice); + this.measurementSymbol = measurementSymbol; + this.nextSymbol = nextSymbol; + } + + protected ChangePointTableScanNode() {} + + public Symbol getMeasurementSymbol() { + return measurementSymbol; + } + + public Symbol getNextSymbol() { + return nextSymbol; + } + + @Override + public List getOutputSymbols() { + return ImmutableList.builder().addAll(super.getOutputSymbols()).add(nextSymbol).build(); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitChangePointTableScan(this, context); + } + + @Override + public ChangePointTableScanNode clone() { + return new ChangePointTableScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + measurementSymbol, + nextSymbol); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.CHANGE_POINT_TABLE_SCAN_NODE.serialize(byteBuffer); + DeviceTableScanNode.serializeMemberVariables(this, byteBuffer, true); + Symbol.serialize(measurementSymbol, byteBuffer); + Symbol.serialize(nextSymbol, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.CHANGE_POINT_TABLE_SCAN_NODE.serialize(stream); + DeviceTableScanNode.serializeMemberVariables(this, stream, true); + Symbol.serialize(measurementSymbol, stream); + Symbol.serialize(nextSymbol, stream); + } + + public static ChangePointTableScanNode deserialize(ByteBuffer byteBuffer) { + ChangePointTableScanNode node = new ChangePointTableScanNode(); + DeviceTableScanNode.deserializeMemberVariables(byteBuffer, node, true); + node.measurementSymbol = Symbol.deserialize(byteBuffer); + node.nextSymbol = Symbol.deserialize(byteBuffer); + node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer)); + return node; + } + + /** + * Factory method: merges a ChangePointNode with a DeviceTableScanNode into a single + * ChangePointTableScanNode. + */ + public static ChangePointTableScanNode combineChangePointAndTableScan( + ChangePointNode changePointNode, DeviceTableScanNode scanNode) { + return new ChangePointTableScanNode( + changePointNode.getPlanNodeId(), + scanNode.getQualifiedObjectName(), + scanNode.getOutputSymbols(), + scanNode.getAssignments(), + scanNode.getDeviceEntries(), + scanNode.getTagAndAttributeIndexMap(), + scanNode.getScanOrder(), + scanNode.getTimePredicate().orElse(null), + scanNode.getPushDownPredicate(), + scanNode.getPushDownLimit(), + scanNode.getPushDownOffset(), + scanNode.isPushLimitToEachDevice(), + scanNode.containsNonAlignedDevice(), + changePointNode.getMeasurementSymbol(), + changePointNode.getNextSymbol()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ChangePointTableScanNode that = (ChangePointTableScanNode) o; + return Objects.equals(measurementSymbol, that.measurementSymbol) + && Objects.equals(nextSymbol, that.nextSymbol); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), measurementSymbol, nextSymbol); + } + + @Override + public String toString() { + return "ChangePointTableScan-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index c9bf429b1833f..a7e5f8b810b9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -235,6 +235,14 @@ public static Pattern window() { return typeOf(WindowNode.class); } + public static Pattern changePoint() { + return typeOf(ChangePointNode.class); + } + + public static Pattern changePointTableScan() { + return typeOf(ChangePointTableScanNode.class); + } + public static Pattern groupNode() { return typeOf(GroupNode.class); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java index 3d688610f0111..215dcfda6f7d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java @@ -26,7 +26,9 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.EliminateLimitWithTableScan; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithMergeSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithMergeSort; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushChangePointIntoTableScan; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownOffsetIntoTableScan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ReplaceFilterWindowLeadWithChangePoint; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -41,6 +43,16 @@ public DistributedOptimizeFactory(PlannerContext plannerContext) { this.planOptimizers = ImmutableList.of( + // replace Filter(Window(LEAD)) change-point pattern with ChangePointNode + new IterativeOptimizer( + plannerContext, + ruleStats, + ImmutableSet.of(new ReplaceFilterWindowLeadWithChangePoint())), + // push ChangePointNode into DeviceTableScanNode for TsFile statistics optimization + new IterativeOptimizer( + plannerContext, + ruleStats, + ImmutableSet.of(new PushChangePointIntoTableScan())), // transfer Limit+Sort to TopK new IterativeOptimizer( plannerContext, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java new file mode 100644 index 0000000000000..21d9e2e05e0bb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.source.ChangePointOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; +import org.apache.iotdb.db.storageengine.buffer.ChunkCache; +import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; + +import io.airlift.units.Duration; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.type.TypeFactory; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.record.datapoint.IntDataPoint; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ChangePointOperatorTest { + + private static final String SG_NAME = "root.ChangePointOperatorTest"; + private static final String DEVICE_ID = SG_NAME + ".device0"; + private static final String MEASUREMENT = "sensor0"; + private static final long FLUSH_INTERVAL = 20; + + private final List seqResources = new ArrayList<>(); + private final List unSeqResources = new ArrayList<>(); + private ExecutorService instanceNotificationExecutor; + + @Before + public void setUp() { + instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + } + + @After + public void tearDown() throws IOException { + for (TsFileResource r : seqResources) { + r.remove(); + } + for (TsFileResource r : unSeqResources) { + r.remove(); + } + seqResources.clear(); + unSeqResources.clear(); + FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + BloomFilterCache.getInstance().clear(); + EnvironmentUtils.cleanAllDir(); + instanceNotificationExecutor.shutdown(); + } + + /** + * All values are distinct (0, 1, 2, ..., 99). Every point is a change point, so the operator + * should output all 100 points. + */ + @Test + public void testAllDistinctValues() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = i; + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(false); + List result = collectResults(operator); + + assertEquals(100, result.size()); + for (int i = 0; i < 100; i++) { + assertEquals(i, result.get(i)[0]); + assertEquals(i, result.get(i)[1]); + } + operator.close(); + } + + /** + * All values are the same constant (42). Only the first point should be emitted as a change + * point. + */ + @Test + public void testAllSameValues() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = 42; + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(false); + List result = collectResults(operator); + + assertEquals(1, result.size()); + assertEquals(0, result.get(0)[0]); + assertEquals(42, result.get(0)[1]); + operator.close(); + } + + /** + * All values are the same constant (42), with statistics optimization enabled. Should produce the + * same result as without statistics: only the first point. + */ + @Test + public void testAllSameValuesWithStatistics() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = 42; + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(true); + List result = collectResults(operator); + + assertEquals(1, result.size()); + assertEquals(0, result.get(0)[0]); + assertEquals(42, result.get(0)[1]); + operator.close(); + } + + /** + * Values form runs of consecutive duplicates: 10 x value_A, 10 x value_B, 10 x value_C, ... The + * operator should output only the first point of each run. + */ + @Test + public void testConsecutiveDuplicateRuns() throws Exception { + int runLength = 10; + int numRuns = 10; + int totalPoints = runLength * numRuns; + int[] values = new int[totalPoints]; + for (int run = 0; run < numRuns; run++) { + for (int j = 0; j < runLength; j++) { + values[run * runLength + j] = (run + 1) * 100; + } + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(false); + List result = collectResults(operator); + + assertEquals(numRuns, result.size()); + for (int run = 0; run < numRuns; run++) { + assertEquals(run * runLength, result.get(run)[0]); + assertEquals((run + 1) * 100, result.get(run)[1]); + } + operator.close(); + } + + /** + * Same data as testConsecutiveDuplicateRuns but with statistics optimization enabled. The result + * must be identical. + */ + @Test + public void testConsecutiveDuplicateRunsWithStatistics() throws Exception { + int runLength = 10; + int numRuns = 10; + int totalPoints = runLength * numRuns; + int[] values = new int[totalPoints]; + for (int run = 0; run < numRuns; run++) { + for (int j = 0; j < runLength; j++) { + values[run * runLength + j] = (run + 1) * 100; + } + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(true); + List result = collectResults(operator); + + assertEquals(numRuns, result.size()); + for (int run = 0; run < numRuns; run++) { + assertEquals(run * runLength, result.get(run)[0]); + assertEquals((run + 1) * 100, result.get(run)[1]); + } + operator.close(); + } + + /** + * Alternating pattern: each point differs from the previous one (1, 2, 1, 2, ...). All points are + * change points. + */ + @Test + public void testAlternatingValues() throws Exception { + int[] values = new int[60]; + for (int i = 0; i < 60; i++) { + values[i] = (i % 2 == 0) ? 1 : 2; + } + prepareSeqFile(0, values); + + ChangePointOperator operator = createOperator(false); + List result = collectResults(operator); + + assertEquals(60, result.size()); + for (int i = 0; i < 60; i++) { + assertEquals(i, result.get(i)[0]); + assertEquals((i % 2 == 0) ? 1 : 2, result.get(i)[1]); + } + operator.close(); + } + + /** A single data point. The operator should emit exactly one change point. */ + @Test + public void testSinglePoint() throws Exception { + prepareSeqFile(0, new int[] {99}); + + ChangePointOperator operator = createOperator(false); + List result = collectResults(operator); + + assertEquals(1, result.size()); + assertEquals(0, result.get(0)[0]); + assertEquals(99, result.get(0)[1]); + operator.close(); + } + + /** + * Data spans multiple TsFile pages (flush every 20 rows). Each page has a constant value, but + * value changes across pages. With statistics enabled, entire pages should be skipped or emit a + * single point. + * + *

Page 0 (time 0-19): all 100, Page 1 (time 20-39): all 100, Page 2 (time 40-59): all 200 + * + *

Expected: 2 change points: (0, 100) and (40, 200) + */ + @Test + public void testStatisticsSkipAcrossPages() throws Exception { + int[] values = new int[60]; + for (int i = 0; i < 40; i++) { + values[i] = 100; + } + for (int i = 40; i < 60; i++) { + values[i] = 200; + } + prepareSeqFile(0, values); + + ChangePointOperator operatorWithStats = createOperator(true); + List resultWithStats = collectResults(operatorWithStats); + operatorWithStats.close(); + + assertEquals(2, resultWithStats.size()); + assertEquals(0, resultWithStats.get(0)[0]); + assertEquals(100, resultWithStats.get(0)[1]); + assertEquals(40, resultWithStats.get(1)[0]); + assertEquals(200, resultWithStats.get(1)[1]); + } + + /** + * Same as testStatisticsSkipAcrossPages but without statistics. Verifies the raw-data path + * produces the same result. + */ + @Test + public void testNoStatisticsAcrossPages() throws Exception { + int[] values = new int[60]; + for (int i = 0; i < 40; i++) { + values[i] = 100; + } + for (int i = 40; i < 60; i++) { + values[i] = 200; + } + prepareSeqFile(0, values); + + ChangePointOperator operatorNoStats = createOperator(false); + List resultNoStats = collectResults(operatorNoStats); + operatorNoStats.close(); + + assertEquals(2, resultNoStats.size()); + assertEquals(0, resultNoStats.get(0)[0]); + assertEquals(100, resultNoStats.get(0)[1]); + assertEquals(40, resultNoStats.get(1)[0]); + assertEquals(200, resultNoStats.get(1)[1]); + } + + /** + * Tests that statistics and non-statistics paths yield identical results for mixed data where + * some pages are uniform and others are not. + * + *

Page 0 (0-19): all 5 (uniform), Page 1 (20-39): values 5,6,5,6,... (non-uniform), Page 2 + * (40-59): all 6 (uniform) + */ + @Test + public void testStatisticsAndRawPathConsistency() throws Exception { + int[] values = new int[60]; + for (int i = 0; i < 20; i++) { + values[i] = 5; + } + for (int i = 20; i < 40; i++) { + values[i] = (i % 2 == 0) ? 5 : 6; + } + for (int i = 40; i < 60; i++) { + values[i] = 6; + } + prepareSeqFile(0, values); + + ChangePointOperator opWithStats = createOperator(true); + List resultWithStats = collectResults(opWithStats); + opWithStats.close(); + + tearDownResources(); + + // Re-create the same data + for (int i = 0; i < 20; i++) { + values[i] = 5; + } + for (int i = 20; i < 40; i++) { + values[i] = (i % 2 == 0) ? 5 : 6; + } + for (int i = 40; i < 60; i++) { + values[i] = 6; + } + prepareSeqFile(0, values); + + ChangePointOperator opNoStats = createOperator(false); + List resultNoStats = collectResults(opNoStats); + opNoStats.close(); + + assertEquals(resultNoStats.size(), resultWithStats.size()); + for (int i = 0; i < resultNoStats.size(); i++) { + assertEquals( + "Mismatch at index " + i + " timestamp", + resultNoStats.get(i)[0], + resultWithStats.get(i)[0]); + assertEquals( + "Mismatch at index " + i + " value", resultNoStats.get(i)[1], resultWithStats.get(i)[1]); + } + } + + /** + * Multiple seq files. File 0: all value 10 (time 0-49), File 1: all value 20 (time 50-99). The + * operator should output 2 change points across files. + */ + @Test + public void testMultipleFiles() throws Exception { + int[] values1 = new int[50]; + for (int i = 0; i < 50; i++) { + values1[i] = 10; + } + prepareSeqFile(0, values1); + + int[] values2 = new int[50]; + for (int i = 0; i < 50; i++) { + values2[i] = 20; + } + prepareSeqFile(50, values2); + + ChangePointOperator operator = createOperator(true); + List result = collectResults(operator); + operator.close(); + + assertEquals(2, result.size()); + assertEquals(0, result.get(0)[0]); + assertEquals(10, result.get(0)[1]); + assertEquals(50, result.get(1)[0]); + assertEquals(20, result.get(1)[1]); + } + + /** + * Multiple files where the value does NOT change across file boundary. File 0: all value 10 (time + * 0-49), File 1: all value 10 (time 50-99). Should output only 1 change point. + */ + @Test + public void testMultipleFilesSameValue() throws Exception { + int[] values1 = new int[50]; + for (int i = 0; i < 50; i++) { + values1[i] = 10; + } + prepareSeqFile(0, values1); + + int[] values2 = new int[50]; + for (int i = 0; i < 50; i++) { + values2[i] = 10; + } + prepareSeqFile(50, values2); + + ChangePointOperator operator = createOperator(true); + List result = collectResults(operator); + operator.close(); + + assertEquals(1, result.size()); + assertEquals(0, result.get(0)[0]); + assertEquals(10, result.get(0)[1]); + } + + /** Verifies isFinished() returns true after all data is consumed. */ + @Test + public void testIsFinished() throws Exception { + prepareSeqFile(0, new int[] {1, 1, 2, 2, 3}); + + ChangePointOperator operator = createOperator(false); + assertFalse(operator.isFinished()); + + while (operator.hasNext()) { + operator.next(); + } + assertTrue(operator.isFinished()); + operator.close(); + } + + // ==================== Helper methods ==================== + + private ChangePointOperator createOperator(boolean canUseStatistics) throws Exception { + List measurementSchemas = new ArrayList<>(); + measurementSchemas.add(new MeasurementSchema(MEASUREMENT, TSDataType.INT32)); + + List measurementColumnNames = new ArrayList<>(); + measurementColumnNames.add(MEASUREMENT); + + if (!canUseStatistics) { + measurementSchemas.add(new MeasurementSchema("__dummy__", TSDataType.INT32)); + measurementColumnNames.add("__dummy__"); + } + + Set allSensors = new HashSet<>(measurementColumnNames); + allSensors.add(""); + + IDeviceID deviceId = IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID); + DeviceEntry deviceEntry = new AlignedDeviceEntry(deviceId, new Binary[0]); + List deviceEntries = new ArrayList<>(); + deviceEntries.add(deviceEntry); + + List columnSchemas = new ArrayList<>(); + columnSchemas.add( + new ColumnSchema( + "time", TypeFactory.getType(TSDataType.TIMESTAMP), false, TsTableColumnCategory.TIME)); + columnSchemas.add( + new ColumnSchema( + MEASUREMENT, + TypeFactory.getType(TSDataType.INT32), + false, + TsTableColumnCategory.FIELD)); + int[] columnsIndexArray = new int[] {0, 0}; + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId, ChangePointOperator.class.getSimpleName()); + + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + + ChangePointOperator.ChangePointOperatorParameter parameter = + new ChangePointOperator.ChangePointOperatorParameter( + driverContext.getOperatorContexts().get(0), + planNodeId, + columnSchemas, + columnsIndexArray, + deviceEntries, + Ordering.ASC, + scanOptionsBuilder.build(), + measurementColumnNames, + allSensors, + measurementSchemas, + 0, + idColumnIndex -> null); + + ChangePointOperator operator = new ChangePointOperator(parameter); + + operator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + return operator; + } + + /** Collects all (timestamp, int_value) pairs from the operator output. */ + private List collectResults(ChangePointOperator operator) throws Exception { + List results = new ArrayList<>(); + while (operator.hasNext()) { + TsBlock tsBlock = operator.next(); + if (tsBlock == null) { + continue; + } + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long time = tsBlock.getColumn(0).getLong(i); + int value = tsBlock.getColumn(1).getInt(i); + results.add(new long[] {time, value}); + } + } + return results; + } + + /** + * Creates a sequential TsFile with the given INT32 values starting at timeOffset. Timestamps are + * timeOffset, timeOffset+1, ..., timeOffset+values.length-1. The file is flushed every {@link + * #FLUSH_INTERVAL} rows to create multiple pages. + */ + private void prepareSeqFile(long timeOffset, int[] values) throws Exception { + int fileIndex = seqResources.size(); + File file = new File(TestConstant.getTestTsFilePath(SG_NAME, 0, 0, fileIndex)); + TsFileResource resource = new TsFileResource(file); + resource.setStatusForTest(TsFileResourceStatus.NORMAL); + resource.setMinPlanIndex(fileIndex); + resource.setMaxPlanIndex(fileIndex); + resource.setVersion(fileIndex); + + IMeasurementSchema schema = + new MeasurementSchema( + MEASUREMENT, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED); + + if (!file.getParentFile().exists()) { + Assert.assertTrue(file.getParentFile().mkdirs()); + } + TsFileWriter writer = new TsFileWriter(file); + Map template = new HashMap<>(); + template.put(schema.getMeasurementName(), schema); + writer.registerSchemaTemplate("template0", template, true); + writer.registerDevice(DEVICE_ID, "template0"); + + for (int i = 0; i < values.length; i++) { + long timestamp = timeOffset + i; + TSRecord record = new TSRecord(DEVICE_ID, timestamp); + record.addTuple(new IntDataPoint(MEASUREMENT, values[i])); + writer.writeRecord(record); + + resource.updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID), timestamp); + resource.updateEndTime(IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID), timestamp); + + if ((i + 1) % FLUSH_INTERVAL == 0) { + writer.flush(); + } + } + writer.close(); + + seqResources.add(resource); + } + + private void tearDownResources() throws IOException { + for (TsFileResource r : seqResources) { + r.remove(); + } + for (TsFileResource r : unSeqResources) { + r.remove(); + } + seqResources.clear(); + unSeqResources.clear(); + FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + BloomFilterCache.getInstance().clear(); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java new file mode 100644 index 0000000000000..29fe5d1801e41 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern; + +import org.junit.Test; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.changePoint; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.changePointTableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window; + +public class ChangePointOptimizationTest { + + @Test + public void testChangePointOptimization() { + PlanTester planTester = new PlanTester(); + + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next FROM table1) WHERE s1 <> next OR next IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Logical plan still has Filter -> Window (optimization happens at distributed level) + assertPlan(logicalQueryPlan, output((filter(window(group(tableScan)))))); + + // Distributed plan: ChangePointNode + TableScan merged into ChangePointTableScanNode + // Fragment 0: Output -> Collect -> Exchange* + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + // Fragment 1+: ChangePointTableScanNode (pushdown merged) + assertPlan(planTester.getFragmentPlan(1), changePointTableScan("testdb.table1")); + assertPlan(planTester.getFragmentPlan(2), changePointTableScan("testdb.table1")); + } + + @Test + public void testChangePointNotMatchedWithLag() { + PlanTester planTester = new PlanTester(); + + // LAG instead of LEAD should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LAG(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS prev FROM table1) WHERE s1 <> prev OR prev IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Should remain Filter -> Window in the logical plan + assertPlan(logicalQueryPlan, output((filter(window(group(tableScan)))))); + + // Distributed plan: should still have Filter -> Window per partition (no ChangePoint) + assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan))); + } + + @Test + public void testChangePointNotMatchedWithDifferentPredicate() { + PlanTester planTester = new PlanTester(); + + // Different predicate (s1 = next instead of s1 != next) should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next FROM table1) WHERE s1 = next"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + assertPlan(logicalQueryPlan, output(filter(window(group(tableScan))))); + + // Distributed plan: should still have Filter -> Window (no ChangePoint) + assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan))); + } + + @Test + public void testChangePointNotMatchedWithMultipleWindowFunctions() { + PlanTester planTester = new PlanTester(); + + // Multiple window functions should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS rn FROM table1) WHERE s1 <> next OR next IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Should not be transformed to ChangePoint because there are multiple window functions + assertPlan(logicalQueryPlan, output(filter(window(group(tableScan))))); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 03f79fd2ec29a..3fc9b9dc0baf6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; @@ -481,6 +483,20 @@ public static PlanMatchPattern rowNumber(PlanMatchPattern source) { return node(RowNumberNode.class, source); } + public static PlanMatchPattern changePoint(PlanMatchPattern source) { + return node(ChangePointNode.class, source); + } + + public static PlanMatchPattern changePointTableScan(String expectedTableName) { + return node(ChangePointTableScanNode.class) + .with( + new DeviceTableScanMatcher( + expectedTableName, + Optional.empty(), + Collections.emptyList(), + Collections.emptySet())); + } + public static PlanMatchPattern markDistinct( String markerSymbol, List distinctSymbols, PlanMatchPattern source) { return node(MarkDistinctNode.class, source)