diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 2872a3a2d5..bfe5b68ac6 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -549,6 +549,8 @@ func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkora if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { segmentPks, segmentTss := segment.DeleteRecords() batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + bfHitRowInL0 := int64(0) + start := time.Now() for idx := 0; idx < len(segmentPks); idx += batchSize { endIdx := idx + batchSize if endIdx > len(segmentPks) { @@ -559,12 +561,22 @@ func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkora hits := candidate.BatchPkExist(lc) for i, hit := range hits { if hit { + bfHitRowInL0 += 1 if err := fn(segmentPks[idx+i], segmentTss[idx+i]); err != nil { return err } } } } + + log.Info("forward delete to worker...", + zap.Int64("L0SegmentID", segment.ID()), + zap.Int64("segmentID", candidate.ID()), + zap.String("channel", segment.LoadInfo().GetInsertChannel()), + zap.Int("totalDeleteRowsInL0", len(segmentPks)), + zap.Int64("bfHitRowsInL0", bfHitRowInL0), + zap.Int64("bfCost", time.Since(start).Milliseconds()), + ) } } return nil @@ -658,6 +670,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, // list buffered delete deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp()) tsHitDeleteRows := int64(0) + bfHitDeleteRows := int64(0) + start := time.Now() for _, entry := range deleteRecords { for _, record := range entry.Data { tsHitDeleteRows += int64(len(record.DeleteData.Pks)) @@ -676,6 +690,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, hits := candidate.BatchPkExist(lc) for i, hit := range hits { if hit { + bfHitDeleteRows += 1 err := bufferedForwarder.Buffer(pks[idx+i], record.DeleteData.Tss[idx+i]) if err != nil { return err @@ -685,6 +700,14 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, } } } + log.Info("forward delete to worker...", + zap.String("channel", info.InsertChannel), + zap.Int64("segmentID", info.GetSegmentID()), + zap.Time("startPosition", tsoutil.PhysicalTime(info.GetStartPosition().GetTimestamp())), + zap.Int64("tsHitDeleteRowNum", tsHitDeleteRows), + zap.Int64("bfHitDeleteRowNum", bfHitDeleteRows), + zap.Int64("bfCost", time.Since(start).Milliseconds()), + ) err := bufferedForwarder.Flush() if err != nil { return err @@ -992,16 +1015,20 @@ func (sd *shardDelegator) SyncTargetVersion( sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs) start := time.Now() sizeBeforeClean, _ := sd.deleteBuffer.Size() + l0NumBeforeClean := len(sd.deleteBuffer.ListL0()) sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp()) sizeAfterClean, _ := sd.deleteBuffer.Size() + l0NumAfterClean := len(sd.deleteBuffer.ListL0()) - if sizeAfterClean < sizeBeforeClean { + if sizeAfterClean < sizeBeforeClean || l0NumAfterClean < l0NumBeforeClean { log.Info("clean delete buffer", zap.String("channel", sd.vchannelName), zap.Time("deleteSeekPos", tsoutil.PhysicalTime(deleteSeekPos.GetTimestamp())), zap.Time("channelCP", tsoutil.PhysicalTime(checkpoint.GetTimestamp())), zap.Int64("sizeBeforeClean", sizeBeforeClean), zap.Int64("sizeAfterClean", sizeAfterClean), + zap.Int("l0NumBeforeClean", l0NumBeforeClean), + zap.Int("l0NumAfterClean", l0NumAfterClean), zap.Duration("cost", time.Since(start)), ) } diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go index ed8bc3dedd..a7f1ebb3be 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go @@ -22,8 +22,11 @@ import ( "sync" "github.com/cockroachdb/errors" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) var errBufferFull = errors.New("buffer full") @@ -82,6 +85,10 @@ func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) { for _, seg := range segmentList { if seg != nil { c.l0Segments = append(c.l0Segments, seg) + log.Info("register l0 from delete buffer", + zap.Int64("segmentID", seg.ID()), + zap.Time("startPosition", tsoutil.PhysicalTime(seg.StartPosition().GetTimestamp())), + ) } } } @@ -100,6 +107,11 @@ func (c *doubleCacheBuffer[T]) UnRegister(ts uint64) { for _, s := range c.l0Segments { if s.StartPosition().GetTimestamp() < ts { s.Release(context.TODO()) + log.Info("unregister l0 from delete buffer", + zap.Int64("segmentID", s.ID()), + zap.Time("startPosition", tsoutil.PhysicalTime(s.StartPosition().GetTimestamp())), + zap.Time("cleanTs", tsoutil.PhysicalTime(ts)), + ) continue } newSegments = append(newSegments, s) diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go index eb3ccd70b7..c5b483ff6e 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go @@ -158,7 +158,11 @@ func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() { }) seg3 := segments.NewMockSegment(s.T()) + seg3.On("ID").Return(int64(3)) seg3.On("Release", mock.Anything).Return() + seg3.On("StartPosition").Return(&msgpb.MsgPosition{ + Timestamp: 30, + }) // Test RegisterL0 with multiple segments buffer.RegisterL0(seg1, seg2) @@ -177,7 +181,10 @@ func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() { s.Equal(0, len(emptyBuffer.ListL0())) // Test UnRegister - buffer.UnRegister(15) + buffer.UnRegister(seg1.StartPosition().GetTimestamp()) + segments = buffer.ListL0() + s.Equal(2, len(segments)) + buffer.UnRegister(seg1.StartPosition().GetTimestamp() + 1) segments = buffer.ListL0() s.Equal(1, len(segments)) s.Equal(int64(2), segments[0].ID()) diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go index 30aa251f79..2ddea74a22 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go @@ -21,9 +21,12 @@ import ( "sync" "github.com/cockroachdb/errors" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] { @@ -65,6 +68,10 @@ func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) { for _, seg := range segmentList { if seg != nil { b.l0Segments = append(b.l0Segments, seg) + log.Info("register l0 from delete buffer", + zap.Int64("segmentID", seg.ID()), + zap.Time("startPosition", tsoutil.PhysicalTime(seg.StartPosition().GetTimestamp())), + ) } } @@ -83,10 +90,15 @@ func (b *listDeleteBuffer[T]) UnRegister(ts uint64) { var newSegments []segments.Segment for _, s := range b.l0Segments { - if s.StartPosition().GetTimestamp() > ts { + if s.StartPosition().GetTimestamp() >= ts { newSegments = append(newSegments, s) } else { s.Release(context.TODO()) + log.Info("unregister l0 from delete buffer", + zap.Int64("segmentID", s.ID()), + zap.Time("startPosition", tsoutil.PhysicalTime(s.StartPosition().GetTimestamp())), + zap.Time("cleanTs", tsoutil.PhysicalTime(ts)), + ) } } b.l0Segments = newSegments diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go index a542f24e26..40c4ab7553 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go @@ -148,7 +148,11 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() { }) seg3 := segments.NewMockSegment(s.T()) + seg3.On("ID").Return(int64(3)) seg3.On("Release", mock.Anything).Return() + seg3.On("StartPosition").Return(&msgpb.MsgPosition{ + Timestamp: 30, + }) // Test RegisterL0 with multiple segments buffer.RegisterL0(seg1, seg2) @@ -167,7 +171,10 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() { s.Equal(0, len(emptyBuffer.ListL0())) // Test UnRegister - buffer.UnRegister(15) + buffer.UnRegister(seg1.StartPosition().GetTimestamp()) + segments = buffer.ListL0() + s.Equal(2, len(segments)) + buffer.UnRegister(seg1.StartPosition().GetTimestamp() + 1) segments = buffer.ListL0() s.Equal(1, len(segments)) s.Equal(int64(2), segments[0].ID()) diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index fbd150c5a9..f6c3dd59de 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -418,6 +418,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() { s.Require().NoError(err) s.delegator.deleteBuffer.RegisterL0(l0Segment) + seg.EXPECT().ID().Return(1) seg.EXPECT().Partition().Return(100) seg.EXPECT().BatchPkExist(mock.Anything).Return(lo.RepeatBy(n, func(i int) bool { return true })) seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 7b5fa3761d..149f703b7c 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1304,7 +1304,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi }) }) case querypb.SyncType_UpdateVersion: - log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()), zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs())) + log.Info("sync action", + zap.Int64("TargetVersion", action.GetTargetVersion()), + zap.Time("checkPoint", tsoutil.PhysicalTime(action.GetCheckpoint().GetTimestamp())), + zap.Time("deleteCP", tsoutil.PhysicalTime(action.GetDeleteCP().GetTimestamp())), + zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs())) droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) { if action.GetCheckpoint() == nil { return id, typeutil.MaxTimestamp