diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index b5615fe48e..8886111fb1 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -634,4 +634,5 @@ message DeleteRequest { int64 segment_id = 5; schema.IDs primary_keys = 6; repeated uint64 timestamps = 7; + DataScope scope = 8; } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index bc45a0ebff..7442d449e9 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -202,7 +202,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // delete will be processed after loaded again return nil } - offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments)...) + offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...) return nil }) } @@ -217,7 +217,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // panic here, local worker shall not have error panic(err) } - offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing)...) + offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...) return nil }) } @@ -237,7 +237,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { } // applyDelete handles delete record and apply them to corresponding workers. -func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry) []int64 { +func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 { var offlineSegments []int64 log := sd.getLogger(ctx) for _, segmentEntry := range entries { @@ -261,6 +261,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker SegmentId: segmentEntry.SegmentID, PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), Timestamps: delRecord.Timestamps, + Scope: scope, }) if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index ea43e57072..79a597de61 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1420,6 +1420,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Int64("collectionID", req.GetCollectionId()), zap.String("channel", req.GetVchannelName()), zap.Int64("segmentID", req.GetSegmentId()), + zap.String("scope", req.GetScope().String()), ) // check node healthy @@ -1439,7 +1440,19 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( zap.Uint64s("tss", req.GetTimestamps()), ) - segments := node.manager.Segment.GetBy(segments.WithID(req.GetSegmentId())) + filters := []segments.SegmentFilter{ + segments.WithID(req.GetSegmentId()), + } + + // do not add filter for Unknown & All scope, for backward cap + switch req.GetScope() { + case querypb.DataScope_Historical: + filters = append(filters, segments.WithType(segments.SegmentTypeSealed)) + case querypb.DataScope_Streaming: + filters = append(filters, segments.WithType(segments.SegmentTypeGrowing)) + } + + segments := node.manager.Segment.GetBy(filters...) if len(segments) == 0 { err := merr.WrapErrSegmentNotFound(req.GetSegmentId()) log.Warn("segment not found for delete") diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 891e57f4fc..420d9666e0 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1950,6 +1950,7 @@ func (suite *ServiceSuite) TestDelete_Int64() { SegmentId: suite.validSegmentIDs[0], VchannelName: suite.vchannel, Timestamps: []uint64{0}, + Scope: querypb.DataScope_Historical, } // type int @@ -2023,9 +2024,15 @@ func (suite *ServiceSuite) TestDelete_Failed() { }, } + // segment not found + req.Scope = querypb.DataScope_Streaming + status, err := suite.node.Delete(ctx, req) + suite.NoError(err) + suite.False(merr.Ok(status)) + // target not match req.Base.TargetID = -1 - status, err := suite.node.Delete(ctx, req) + status, err = suite.node.Delete(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode())