fix: Add scope limit for querynode DeleteRequest (#29474)

See also #27515

When Delegator processes delete data, it forwards delete data with only
segment id specified. When two segments has same segment id but one is
growing and the other is sealed, the delete will be applied to both
segments which causes delete data out of order when concurrent load
segment occurs.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-12-26 14:28:47 +08:00 committed by GitHub
parent 6cbf9c489d
commit 02bc0d0dd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 5 deletions

View File

@ -641,6 +641,7 @@ message DeleteRequest {
int64 segment_id = 5; int64 segment_id = 5;
schema.IDs primary_keys = 6; schema.IDs primary_keys = 6;
repeated uint64 timestamps = 7; repeated uint64 timestamps = 7;
DataScope scope = 8;
} }
message ActivateCheckerRequest { message ActivateCheckerRequest {

View File

@ -214,7 +214,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// delete will be processed after loaded again // delete will be processed after loaded again
return nil return nil
} }
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments)...) offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...)
return nil return nil
}) })
} }
@ -229,7 +229,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// panic here, local worker shall not have error // panic here, local worker shall not have error
panic(err) panic(err)
} }
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing)...) offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...)
return nil return nil
}) })
} }
@ -249,7 +249,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
} }
// applyDelete handles delete record and apply them to corresponding workers. // applyDelete handles delete record and apply them to corresponding workers.
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry) []int64 { func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 {
var offlineSegments []int64 var offlineSegments []int64
log := sd.getLogger(ctx) log := sd.getLogger(ctx)
for _, segmentEntry := range entries { for _, segmentEntry := range entries {
@ -273,6 +273,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
SegmentId: segmentEntry.SegmentID, SegmentId: segmentEntry.SegmentID,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
Timestamps: delRecord.Timestamps, Timestamps: delRecord.Timestamps,
Scope: scope,
}) })
if errors.Is(err, merr.ErrNodeNotFound) { if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node") log.Warn("try to delete data on non-exist node")

View File

@ -1424,6 +1424,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
zap.Int64("collectionID", req.GetCollectionId()), zap.Int64("collectionID", req.GetCollectionId()),
zap.String("channel", req.GetVchannelName()), zap.String("channel", req.GetVchannelName()),
zap.Int64("segmentID", req.GetSegmentId()), zap.Int64("segmentID", req.GetSegmentId()),
zap.String("scope", req.GetScope().String()),
) )
// check node healthy // check node healthy
@ -1443,7 +1444,19 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
zap.Uint64s("tss", req.GetTimestamps()), zap.Uint64s("tss", req.GetTimestamps()),
) )
segments := node.manager.Segment.GetBy(segments.WithID(req.GetSegmentId())) filters := []segments.SegmentFilter{
segments.WithID(req.GetSegmentId()),
}
// do not add filter for Unknown & All scope, for backward cap
switch req.GetScope() {
case querypb.DataScope_Historical:
filters = append(filters, segments.WithType(segments.SegmentTypeSealed))
case querypb.DataScope_Streaming:
filters = append(filters, segments.WithType(segments.SegmentTypeGrowing))
}
segments := node.manager.Segment.GetBy(filters...)
if len(segments) == 0 { if len(segments) == 0 {
err := merr.WrapErrSegmentNotFound(req.GetSegmentId()) err := merr.WrapErrSegmentNotFound(req.GetSegmentId())
log.Warn("segment not found for delete") log.Warn("segment not found for delete")

View File

@ -1968,6 +1968,7 @@ func (suite *ServiceSuite) TestDelete_Int64() {
SegmentId: suite.validSegmentIDs[0], SegmentId: suite.validSegmentIDs[0],
VchannelName: suite.vchannel, VchannelName: suite.vchannel,
Timestamps: []uint64{0}, Timestamps: []uint64{0},
Scope: querypb.DataScope_Historical,
} }
// type int // type int
@ -2041,9 +2042,15 @@ func (suite *ServiceSuite) TestDelete_Failed() {
}, },
} }
// segment not found
req.Scope = querypb.DataScope_Streaming
status, err := suite.node.Delete(ctx, req)
suite.NoError(err)
suite.False(merr.Ok(status))
// target not match // target not match
req.Base.TargetID = -1 req.Base.TargetID = -1
status, err := suite.node.Delete(ctx, req) status, err = suite.node.Delete(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode()) suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode())