Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ keyWords
| QUERY
| QUERYID
| QUOTA
| RECEIVERS
| RANGE
| READONLY
| READ
Expand Down Expand Up @@ -298,4 +299,4 @@ keyWords
| OPTION
| INF
| CURRENT_TIMESTAMP
;
;
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ddlStatement
// ExternalService
| createService | startService | stopService | dropService | showService
// Pipe Task
| createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
| createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes | showReceivers
// Pipe Plugin
| createPipePlugin | dropPipePlugin | showPipePlugins
// Subscription
Expand Down Expand Up @@ -701,6 +701,10 @@ showPipes
: SHOW ((PIPE pipeName=identifier) | PIPES (WHERE (CONNECTOR | SINK) USED BY pipeName=identifier)?)
;

showReceivers
: SHOW RECEIVERS
;

// Pipe Plugin =========================================================================================
createPipePlugin
: CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS className=STRING_LITERAL uriClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ QUOTA
: Q U O T A
;

RECEIVERS
: R E C E I V E R S
;

RANGE
: R A N G E
;
Expand Down Expand Up @@ -1411,4 +1415,4 @@ fragment V: [vV];
fragment W: [wW];
fragment X: [xX];
fragment Y: [yY];
fragment Z: [zZ];
fragment Z: [zZ];
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
Expand Down Expand Up @@ -194,20 +195,22 @@ public TPipeTransferResp receive(final TPipeTransferReq req) {
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime);
return resp;
return recordConfigNodeHandshakeIfSuccess(resp, req);
case HANDSHAKE_CONFIGNODE_V2:
resp =
handleTransferHandshakeV2(
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
if (Objects.nonNull(userEntity)) {
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
}
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime);
return resp;
return recordConfigNodeHandshakeIfSuccess(resp, req);
case TRANSFER_CONFIG_PLAN:
resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigPlanTimer(System.nanoTime() - startTime);
return resp;
return recordConfigNodeTransferIfSuccess(resp);
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
resp =
handleTransferFilePiece(
Expand All @@ -216,14 +219,14 @@ public TPipeTransferResp receive(final TPipeTransferReq req) {
false);
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime);
return resp;
return recordConfigNodeTransferIfSuccess(resp);
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
resp =
handleTransferFileSealV2(
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime);
return resp;
return recordConfigNodeTransferIfSuccess(resp);
case TRANSFER_COMPRESSED:
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
default:
Expand Down Expand Up @@ -262,6 +265,32 @@ private boolean needHandshake(final PipeRequestType type) {
&& type != PipeRequestType.HANDSHAKE_CONFIGNODE_V2;
}

private TPipeTransferResp recordConfigNodeHandshakeIfSuccess(
final TPipeTransferResp resp, final TPipeTransferReq req) {
if (isSuccess(resp)) {
recordPipeReceiverHandshake(
PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE,
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
getProtocol(req));
}
return resp;
}

private TPipeTransferResp recordConfigNodeTransferIfSuccess(final TPipeTransferResp resp) {
if (isSuccess(resp)) {
recordPipeReceiverTransfer();
} else {
recordPipeReceiverRequest();
}
return resp;
}

private static String getProtocol(final TPipeTransferReq req) {
return req instanceof AirGapPseudoTPipeTransferRequest
? PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP
: PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT;
}

private TPipeTransferResp handleTransferConfigPlan(final PipeTransferConfigPlanReq req)
throws IOException {
return new TPipeTransferResp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
Boolean.toString(shouldMarkAsPipeRequest));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, Boolean.toString(skipIfNoPrivileges));
appendPipeInfoToHandshakeParams(params);

return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
Expand Down
Loading
Loading