Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private void checkConflict(
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);

if (enableSendTsFileLimit == null) {
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
sinkParameters.addAttribute(
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString());
LOGGER.info(
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
} else if (!enableSendTsFileLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
Expand Down Expand Up @@ -1817,9 +1818,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
PipeDataNodeAgent.plugin()
.validate(
pipeName,
createPipeStatement.getExtractorAttributes(),
createPipeStatement.getSourceAttributes(),
createPipeStatement.getProcessorAttributes(),
createPipeStatement.getConnectorAttributes());
createPipeStatement.getSinkAttributes());
} catch (final Exception e) {
LOGGER.info("Failed to validate create pipe statement, because {}", e.getMessage(), e);
future.setException(
Expand All @@ -1830,7 +1831,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
// Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, or both realtime
// and history are true), the pipe is split into history-only and realtime–only modes.
final PipeParameters sourcePipeParameters =
new PipeParameters(createPipeStatement.getExtractorAttributes());
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
try (final ConfigNodeClient configNodeClient =
Expand All @@ -1854,7 +1857,7 @@ public SettableFuture<ConfigTaskResult> createPipe(
Boolean.toString(false))))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
.setConnectorAttributes(createPipeStatement.getSinkAttributes());

final TSStatus realtimeTsStatus = configNodeClient.createPipe(realtimeReq);
// If creation fails, immediately return with exception
Expand Down Expand Up @@ -1888,7 +1891,14 @@ public SettableFuture<ConfigTaskResult> createPipe(
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
.setConnectorAttributes(
sinkPipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
Collections.singletonMap(
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
Boolean.TRUE.toString())))
.getAttribute());

final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);
// If creation fails, immediately return with exception
Expand All @@ -1912,9 +1922,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
new TCreatePipeReq()
.setPipeName(pipeName)
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask {

public CreatePipeTask(CreatePipeStatement createPipeStatement) {
// support now() function
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
this.createPipeStatement = createPipeStatement;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3822,11 +3822,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) {
ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);

if (ctx.extractorAttributesClause() != null) {
createPipeStatement.setExtractorAttributes(
createPipeStatement.setSourceAttributes(
parseExtractorAttributesClause(
ctx.extractorAttributesClause().extractorAttributeClause()));
} else {
createPipeStatement.setExtractorAttributes(new HashMap<>());
createPipeStatement.setSourceAttributes(new HashMap<>());
}
if (ctx.processorAttributesClause() != null) {
createPipeStatement.setProcessorAttributes(
Expand All @@ -3836,11 +3836,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) {
createPipeStatement.setProcessorAttributes(new HashMap<>());
}
if (ctx.connectorAttributesClause() != null) {
createPipeStatement.setConnectorAttributes(
createPipeStatement.setSinkAttributes(
parseConnectorAttributesClause(
ctx.connectorAttributesClause().connectorAttributeClause()));
} else {
createPipeStatement.setConnectorAttributes(
createPipeStatement.setSinkAttributes(
parseConnectorAttributesClause(
ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements IConfigStatement {

private String pipeName;
private boolean ifNotExistsCondition;
private Map<String, String> extractorAttributes;
private Map<String, String> sourceAttributes;
private Map<String, String> processorAttributes;
private Map<String, String> connectorAttributes;
private Map<String, String> sinkAttributes;

public CreatePipeStatement(StatementType createPipeStatement) {
this.statementType = createPipeStatement;
Expand All @@ -54,16 +54,16 @@ public boolean hasIfNotExistsCondition() {
return ifNotExistsCondition;
}

public Map<String, String> getExtractorAttributes() {
return extractorAttributes;
public Map<String, String> getSourceAttributes() {
return sourceAttributes;
}

public Map<String, String> getProcessorAttributes() {
return processorAttributes;
}

public Map<String, String> getConnectorAttributes() {
return connectorAttributes;
public Map<String, String> getSinkAttributes() {
return sinkAttributes;
}

public void setPipeName(String pipeName) {
Expand All @@ -74,16 +74,16 @@ public void setIfNotExists(boolean ifNotExistsCondition) {
this.ifNotExistsCondition = ifNotExistsCondition;
}

public void setExtractorAttributes(Map<String, String> extractorAttributes) {
this.extractorAttributes = extractorAttributes;
public void setSourceAttributes(Map<String, String> sourceAttributes) {
this.sourceAttributes = sourceAttributes;
}

public void setProcessorAttributes(Map<String, String> processorAttributes) {
this.processorAttributes = processorAttributes;
}

public void setConnectorAttributes(Map<String, String> connectorAttributes) {
this.connectorAttributes = connectorAttributes;
public void setSinkAttributes(Map<String, String> sinkAttributes) {
this.sinkAttributes = sinkAttributes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ public void testCreatePipeStatement() {
CreatePipeStatement statement = new CreatePipeStatement(StatementType.CREATE_PIPE);

statement.setPipeName("test");
statement.setExtractorAttributes(extractorAttributes);
statement.setSourceAttributes(extractorAttributes);
statement.setProcessorAttributes(processorAttributes);
statement.setConnectorAttributes(connectorAttributes);
statement.setSinkAttributes(connectorAttributes);

Assert.assertEquals("test", statement.getPipeName());
Assert.assertEquals(extractorAttributes, statement.getExtractorAttributes());
Assert.assertEquals(extractorAttributes, statement.getSourceAttributes());
Assert.assertEquals(processorAttributes, statement.getProcessorAttributes());
Assert.assertEquals(connectorAttributes, statement.getConnectorAttributes());
Assert.assertEquals(connectorAttributes, statement.getSinkAttributes());

Assert.assertEquals(QueryType.WRITE, statement.getQueryType());
}
Expand Down
Loading