diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index f9609869b45e6..d66a4f14a7961 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -213,7 +213,8 @@ private void checkConflict( PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT); if (enableSendTsFileLimit == null) { - sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"); + sinkParameters.addAttribute( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString()); LOGGER.info( "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending."); } else if (!enableSendTsFileLimit) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 404990e124dba..d28edddb2f9b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2203,6 +2203,8 @@ public SettableFuture createPipe( // and history are true), the pipe is split into history-only and realtime–only modes. final PipeParameters sourcePipeParameters = new PipeParameters(createPipeStatement.getSourceAttributes()); + final PipeParameters sinkPipeParameters = + new PipeParameters(createPipeStatement.getSinkAttributes()); if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled() && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) { try (final ConfigNodeClient configNodeClient = @@ -2260,7 +2262,14 @@ public SettableFuture createPipe( PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getSinkAttributes()); + .setConnectorAttributes( + sinkPipeParameters + .addOrReplaceEquivalentAttributesWithClone( + new PipeParameters( + Collections.singletonMap( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, + Boolean.TRUE.toString()))) + .getAttribute()); final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq); // If creation fails, immediately return with exception