From 6f4dde83030298e7f0d7732e3bce62445ee3763a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:33:01 +0800 Subject: [PATCH 1/2] may-comp --- .../executor/ClusterConfigTaskExecutor.java | 23 +++++++++++++------ .../config/sys/pipe/CreatePipeTask.java | 2 +- .../queryengine/plan/parser/ASTVisitor.java | 8 +++---- .../metadata/pipe/CreatePipeStatement.java | 20 ++++++++-------- .../statement/sys/pipe/PipeStatementTest.java | 8 +++---- 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index f282a3d004d96..1b28784005586 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -51,6 +51,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; @@ -1817,9 +1818,9 @@ public SettableFuture createPipe( PipeDataNodeAgent.plugin() .validate( pipeName, - createPipeStatement.getExtractorAttributes(), + createPipeStatement.getSourceAttributes(), createPipeStatement.getProcessorAttributes(), - createPipeStatement.getConnectorAttributes()); + createPipeStatement.getSinkAttributes()); } catch (final Exception e) { LOGGER.info("Failed to validate create pipe statement, because {}", e.getMessage(), e); future.setException( @@ -1830,7 +1831,9 @@ public SettableFuture createPipe( // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, or both realtime // and history are true), the pipe is split into history-only and realtime–only modes. final PipeParameters sourcePipeParameters = - new PipeParameters(createPipeStatement.getExtractorAttributes()); + new PipeParameters(createPipeStatement.getSourceAttributes()); + final PipeParameters sinkPipeParameters = + new PipeParameters(createPipeStatement.getSinkAttributes()); if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled() && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) { try (final ConfigNodeClient configNodeClient = @@ -1854,7 +1857,7 @@ public SettableFuture createPipe( Boolean.toString(false)))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes(createPipeStatement.getSinkAttributes()); final TSStatus realtimeTsStatus = configNodeClient.createPipe(realtimeReq); // If creation fails, immediately return with exception @@ -1888,7 +1891,13 @@ public SettableFuture createPipe( PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes( + sinkPipeParameters + .addOrReplaceEquivalentAttributesWithClone( + new PipeParameters( + Collections.singletonMap( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"))) + .getAttribute()); final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq); // If creation fails, immediately return with exception @@ -1912,9 +1921,9 @@ public SettableFuture createPipe( new TCreatePipeReq() .setPipeName(pipeName) .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) - .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) + .setExtractorAttributes(createPipeStatement.getSourceAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes(createPipeStatement.getSinkAttributes()); TSStatus tsStatus = configNodeClient.createPipe(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java index 56a1c69fcd11a..229e950356b3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java @@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask { public CreatePipeTask(CreatePipeStatement createPipeStatement) { // support now() function - applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes()); + applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes()); this.createPipeStatement = createPipeStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 0f60d2f9ba690..77af818e124a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -3822,11 +3822,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) { ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); if (ctx.extractorAttributesClause() != null) { - createPipeStatement.setExtractorAttributes( + createPipeStatement.setSourceAttributes( parseExtractorAttributesClause( ctx.extractorAttributesClause().extractorAttributeClause())); } else { - createPipeStatement.setExtractorAttributes(new HashMap<>()); + createPipeStatement.setSourceAttributes(new HashMap<>()); } if (ctx.processorAttributesClause() != null) { createPipeStatement.setProcessorAttributes( @@ -3836,11 +3836,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) { createPipeStatement.setProcessorAttributes(new HashMap<>()); } if (ctx.connectorAttributesClause() != null) { - createPipeStatement.setConnectorAttributes( + createPipeStatement.setSinkAttributes( parseConnectorAttributesClause( ctx.connectorAttributesClause().connectorAttributeClause())); } else { - createPipeStatement.setConnectorAttributes( + createPipeStatement.setSinkAttributes( parseConnectorAttributesClause( ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java index a7b7471ffd057..634277093c544 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java @@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { private String pipeName; private boolean ifNotExistsCondition; - private Map extractorAttributes; + private Map sourceAttributes; private Map processorAttributes; - private Map connectorAttributes; + private Map sinkAttributes; public CreatePipeStatement(StatementType createPipeStatement) { this.statementType = createPipeStatement; @@ -54,16 +54,16 @@ public boolean hasIfNotExistsCondition() { return ifNotExistsCondition; } - public Map getExtractorAttributes() { - return extractorAttributes; + public Map getSourceAttributes() { + return sourceAttributes; } public Map getProcessorAttributes() { return processorAttributes; } - public Map getConnectorAttributes() { - return connectorAttributes; + public Map getSinkAttributes() { + return sinkAttributes; } public void setPipeName(String pipeName) { @@ -74,16 +74,16 @@ public void setIfNotExists(boolean ifNotExistsCondition) { this.ifNotExistsCondition = ifNotExistsCondition; } - public void setExtractorAttributes(Map extractorAttributes) { - this.extractorAttributes = extractorAttributes; + public void setSourceAttributes(Map sourceAttributes) { + this.sourceAttributes = sourceAttributes; } public void setProcessorAttributes(Map processorAttributes) { this.processorAttributes = processorAttributes; } - public void setConnectorAttributes(Map connectorAttributes) { - this.connectorAttributes = connectorAttributes; + public void setSinkAttributes(Map sinkAttributes) { + this.sinkAttributes = sinkAttributes; } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java index ab885ddb557d9..04fccc195600e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java @@ -42,14 +42,14 @@ public void testCreatePipeStatement() { CreatePipeStatement statement = new CreatePipeStatement(StatementType.CREATE_PIPE); statement.setPipeName("test"); - statement.setExtractorAttributes(extractorAttributes); + statement.setSourceAttributes(extractorAttributes); statement.setProcessorAttributes(processorAttributes); - statement.setConnectorAttributes(connectorAttributes); + statement.setSinkAttributes(connectorAttributes); Assert.assertEquals("test", statement.getPipeName()); - Assert.assertEquals(extractorAttributes, statement.getExtractorAttributes()); + Assert.assertEquals(extractorAttributes, statement.getSourceAttributes()); Assert.assertEquals(processorAttributes, statement.getProcessorAttributes()); - Assert.assertEquals(connectorAttributes, statement.getConnectorAttributes()); + Assert.assertEquals(connectorAttributes, statement.getSinkAttributes()); Assert.assertEquals(QueryType.WRITE, statement.getQueryType()); } From addd812d6c24ac62af68da371c761621c0392344 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:38:01 +0800 Subject: [PATCH 2/2] Avoid someone merge with name 'may-comp' --- .../db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java | 3 ++- .../execution/config/executor/ClusterConfigTaskExecutor.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index e767731c1acb5..8d227d7c27210 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -210,7 +210,8 @@ private void checkConflict( PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT); if (enableSendTsFileLimit == null) { - sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"); + sinkParameters.addAttribute( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString()); LOGGER.info( "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending."); } else if (!enableSendTsFileLimit) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 1b28784005586..24c64c240308b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1896,7 +1896,8 @@ public SettableFuture createPipe( .addOrReplaceEquivalentAttributesWithClone( new PipeParameters( Collections.singletonMap( - PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"))) + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, + Boolean.TRUE.toString()))) .getAttribute()); final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);