diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 71d9d9b25a..c7045f5025 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -323,19 +323,22 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm } deletedPks, deletedTss := sd.segmentManager.GetL0DeleteRecords() - for _, segment := range loaded { - err = segment.Delete(deletedPks, deletedTss) - if err != nil { - log.Warn("failed to forward L0 deletions to growing segment", - zap.Int64("segmentID", segment.ID()), - zap.Error(err), - ) + if len(deletedPks) > 0 { + log.Info("forwarding L0 delete records...", zap.Int("deleteNum", len(deletedPks))) + for _, segment := range loaded { + err = segment.Delete(deletedPks, deletedTss) + if err != nil { + log.Warn("failed to forward L0 deletions to growing segment", + zap.Int64("segmentID", segment.ID()), + zap.Error(err), + ) - // clear loaded growing segments - for _, segment := range loaded { - segment.Release() + // clear loaded growing segments + for _, segment := range loaded { + segment.Release() + } + return err } - return err } } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 7d6eff94b4..fcbe4faf49 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -501,6 +501,62 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }, sealed[0].Segments) }) + s.Run("load_segments_with_l0_delete_failed", func() { + defer func() { + s.workerManager.ExpectedCalls = nil + s.loader.ExpectedCalls = nil + }() + + mockMgr := segments.NewMockSegmentManager(s.T()) + delegator, err := NewShardDelegator( + context.Background(), + s.collectionID, + s.replicaID, + s.vchannelName, + s.version, + s.workerManager, + &segments.Manager{ + Collection: s.manager.Collection, + Segment: mockMgr, + }, + s.tsafeManager, + s.loader, + &msgstream.MockMqFactory{ + NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { + return s.mq, nil + }, + }, 10000) + s.NoError(err) + + growing0 := segments.NewMockSegment(s.T()) + growing1 := segments.NewMockSegment(s.T()) + growing1.EXPECT().ID().Return(2) + growing0.EXPECT().Release() + growing1.EXPECT().Release() + + mockErr := merr.WrapErrServiceInternal("mock") + + growing0.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil) + growing1.EXPECT().Delete(mock.Anything, mock.Anything).Return(mockErr) + + s.loader.EXPECT().Load( + mock.Anything, + mock.Anything, + segments.SegmentTypeGrowing, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]segments.Segment{growing0, growing1}, nil) + + mockMgr.EXPECT().GetL0DeleteRecords().Return( + []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + []uint64{100}, + ) + + err = delegator.LoadGrowing(context.Background(), []*querypb.SegmentLoadInfo{{}, {}}, 100) + s.ErrorIs(err, mockErr) + }) + s.Run("load_segments_with_streaming_delete_failed", func() { defer func() { s.workerManager.ExpectedCalls = nil diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5c743ed329..e24eca6587 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -545,6 +545,10 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ const unsigned long* timestamps); */ + if len(primaryKeys) == 0 { + return nil + } + s.ptrLock.RLock() defer s.ptrLock.RUnlock() diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 91a470a6d3..77771a6f63 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -254,8 +254,19 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta()) collection := node.manager.Collection.Get(req.GetCollectionID()) collection.SetMetricType(req.GetLoadMeta().GetMetricType()) - delegator, err := delegator.NewShardDelegator(ctx, req.GetCollectionID(), req.GetReplicaID(), channel.GetChannelName(), req.GetVersion(), - node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp()) + delegator, err := delegator.NewShardDelegator( + ctx, + req.GetCollectionID(), + req.GetReplicaID(), + channel.GetChannelName(), + req.GetVersion(), + node.clusterManager, + node.manager, + node.tSafeManager, + node.loader, + node.factory, + channel.GetSeekPosition().GetTimestamp(), + ) if err != nil { log.Warn("failed to create shard delegator", zap.Error(err)) return merr.Status(err), nil