From 14d8b1fe856deee85efc63eb1e5ddc81ff992e3a Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 26 Dec 2023 16:40:50 +0800 Subject: [PATCH] fix: [Cherry-pick] Add scope limit for querynode DeleteRequest (#29476) Cherry-pick from master pr: #29474 See also #27515 When Delegator processes delete data, it forwards delete data with only segment id specified. When two segments has same segment id but one is growing and the other is sealed, the delete will be applied to both segments which causes delete data out of order when concurrent load segment occurs. Signed-off-by: Congqi Xia --- internal/proto/query_coord.proto | 1 + internal/querynodev2/delegator/delegator_data.go | 7 ++++--- internal/querynodev2/services.go | 15 ++++++++++++++- internal/querynodev2/services_test.go | 9 ++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index b5615fe48e..8886111fb1 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -634,4 +634,5 @@ message DeleteRequest { int64 segment_id = 5; schema.IDs primary_keys = 6; repeated uint64 timestamps = 7; + DataScope scope = 8; } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index bc45a0ebff..7442d449e9 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -202,7 +202,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // delete will be processed after loaded again return nil } - offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments)...) + offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...) return nil }) } @@ -217,7 +217,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // panic here, local worker shall not have error panic(err) } - offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing)...) + offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...) return nil }) } @@ -237,7 +237,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { } // applyDelete handles delete record and apply them to corresponding workers. -func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry) []int64 { +func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 { var offlineSegments []int64 log := sd.getLogger(ctx) for _, segmentEntry := range entries { @@ -261,6 +261,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker SegmentId: segmentEntry.SegmentID, PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), Timestamps: delRecord.Timestamps, + Scope: scope, }) if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index ea43e57072..79a597de61 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1420,6 +1420,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Int64("collectionID", req.GetCollectionId()), zap.String("channel", req.GetVchannelName()), zap.Int64("segmentID", req.GetSegmentId()), + zap.String("scope", req.GetScope().String()), ) // check node healthy @@ -1439,7 +1440,19 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Uint64s("tss", req.GetTimestamps()), ) - segments := node.manager.Segment.GetBy(segments.WithID(req.GetSegmentId())) + filters := []segments.SegmentFilter{ + segments.WithID(req.GetSegmentId()), + } + + // do not add filter for Unknown & All scope, for backward cap + switch req.GetScope() { + case querypb.DataScope_Historical: + filters = append(filters, segments.WithType(segments.SegmentTypeSealed)) + case querypb.DataScope_Streaming: + filters = append(filters, segments.WithType(segments.SegmentTypeGrowing)) + } + + segments := node.manager.Segment.GetBy(filters...) if len(segments) == 0 { err := merr.WrapErrSegmentNotFound(req.GetSegmentId()) log.Warn("segment not found for delete") diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 891e57f4fc..420d9666e0 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1950,6 +1950,7 @@ func (suite *ServiceSuite) TestDelete_Int64() { SegmentId: suite.validSegmentIDs[0], VchannelName: suite.vchannel, Timestamps: []uint64{0}, + Scope: querypb.DataScope_Historical, } // type int @@ -2023,9 +2024,15 @@ func (suite *ServiceSuite) TestDelete_Failed() { }, } + // segment not found + req.Scope = querypb.DataScope_Streaming + status, err := suite.node.Delete(ctx, req) + suite.NoError(err) + suite.False(merr.Ok(status)) + // target not match req.Base.TargetID = -1 - status, err := suite.node.Delete(ctx, req) + status, err = suite.node.Delete(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode())