From 02bc0d0dd54b49b70ec3182966f61dfff6a54d25 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 26 Dec 2023 14:28:47 +0800 Subject: [PATCH] fix: Add scope limit for querynode DeleteRequest (#29474) See also #27515 When Delegator processes delete data, it forwards delete data with only segment id specified. When two segments has same segment id but one is growing and the other is sealed, the delete will be applied to both segments which causes delete data out of order when concurrent load segment occurs. Signed-off-by: Congqi Xia --- internal/proto/query_coord.proto | 1 + internal/querynodev2/delegator/delegator_data.go | 7 ++++--- internal/querynodev2/services.go | 15 ++++++++++++++- internal/querynodev2/services_test.go | 9 ++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index b0ee6b573e..dee6b7112a 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -641,6 +641,7 @@ message DeleteRequest { int64 segment_id = 5; schema.IDs primary_keys = 6; repeated uint64 timestamps = 7; + DataScope scope = 8; } message ActivateCheckerRequest { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 637952e4ca..0a4313b129 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -214,7 +214,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // delete will be processed after loaded again return nil } - offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments)...) + offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...) return nil }) } @@ -229,7 +229,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // panic here, local worker shall not have error panic(err) } - offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing)...) + offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...) return nil }) } @@ -249,7 +249,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { } // applyDelete handles delete record and apply them to corresponding workers. -func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry) []int64 { +func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 { var offlineSegments []int64 log := sd.getLogger(ctx) for _, segmentEntry := range entries { @@ -273,6 +273,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker SegmentId: segmentEntry.SegmentID, PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), Timestamps: delRecord.Timestamps, + Scope: scope, }) if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 5746296757..7aab0f563d 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1424,6 +1424,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Int64("collectionID", req.GetCollectionId()), zap.String("channel", req.GetVchannelName()), zap.Int64("segmentID", req.GetSegmentId()), + zap.String("scope", req.GetScope().String()), ) // check node healthy @@ -1443,7 +1444,19 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Uint64s("tss", req.GetTimestamps()), ) - segments := node.manager.Segment.GetBy(segments.WithID(req.GetSegmentId())) + filters := []segments.SegmentFilter{ + segments.WithID(req.GetSegmentId()), + } + + // do not add filter for Unknown & All scope, for backward cap + switch req.GetScope() { + case querypb.DataScope_Historical: + filters = append(filters, segments.WithType(segments.SegmentTypeSealed)) + case querypb.DataScope_Streaming: + filters = append(filters, segments.WithType(segments.SegmentTypeGrowing)) + } + + segments := node.manager.Segment.GetBy(filters...) if len(segments) == 0 { err := merr.WrapErrSegmentNotFound(req.GetSegmentId()) log.Warn("segment not found for delete") diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 18cf93af0f..2084f8374f 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1968,6 +1968,7 @@ func (suite *ServiceSuite) TestDelete_Int64() { SegmentId: suite.validSegmentIDs[0], VchannelName: suite.vchannel, Timestamps: []uint64{0}, + Scope: querypb.DataScope_Historical, } // type int @@ -2041,9 +2042,15 @@ func (suite *ServiceSuite) TestDelete_Failed() { }, } + // segment not found + req.Scope = querypb.DataScope_Streaming + status, err := suite.node.Delete(ctx, req) + suite.NoError(err) + suite.False(merr.Ok(status)) + // target not match req.Base.TargetID = -1 - status, err := suite.node.Delete(ctx, req) + status, err = suite.node.Delete(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode())