Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ private void checkConflict(
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);

if (enableSendTsFileLimit == null) {
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
sinkParameters.addAttribute(
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString());
LOGGER.info(
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
} else if (!enableSendTsFileLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2203,6 +2203,8 @@
// and history are true), the pipe is split into history-only and realtime–only modes.
final PipeParameters sourcePipeParameters =
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
try (final ConfigNodeClient configNodeClient =
Expand Down Expand Up @@ -2260,7 +2262,14 @@
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
.setConnectorAttributes(
sinkPipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
Collections.singletonMap(
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
Boolean.TRUE.toString())))
.getAttribute());

final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);
// If creation fails, immediately return with exception
Expand Down Expand Up @@ -2300,7 +2309,7 @@
}

@Override
public SettableFuture<ConfigTaskResult> alterPipe(final AlterPipeStatement alterPipeStatement) {

Check warning on line 2312 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 178 to 64, Complexity from 21 to 14, Nesting Level from 4 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZy9Y7gARJSzbAuXKF6_&open=AZy9Y7gARJSzbAuXKF6_&pullRequest=17264
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();

// Validate pipe name
Expand Down