diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index fadfdf9af7..1d3ad32ed3 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -61,7 +61,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { expectedSegID := seg1.ID s.Require().Equal(1, len(latestL0Segments)) - levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + s.True(needRefresh) s.Require().Equal(1, len(levelZeroView)) cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) s.True(ok) @@ -107,7 +108,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { latestL0Segments := GetViewsByInfo(levelZeroSegments...) s.Require().NotEmpty(latestL0Segments) - levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + s.Require().True(needRefresh) s.Require().Equal(1, len(levelZeroView)) cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) s.True(ok) diff --git a/internal/datacoord/compaction_view_manager.go b/internal/datacoord/compaction_view_manager.go index 3b5d70d8de..373044f77e 100644 --- a/internal/datacoord/compaction_view_manager.go +++ b/internal/datacoord/compaction_view_manager.go @@ -169,48 +169,54 @@ func (m *CompactionViewManager) Check(ctx context.Context) (events map[Compactio } func (m *CompactionViewManager) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView { - var refreshedL0Views []CompactionView + var allRefreshedL0Veiws []CompactionView for collID, segments := range latestCollSegs { levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 }) latestL0Segments := GetViewsByInfo(levelZeroSegments...) - changedL0Views := m.getChangedLevelZeroViews(collID, latestL0Segments) - if len(changedL0Views) == 0 { - continue + needRefresh, collRefreshedViews := m.getChangedLevelZeroViews(collID, latestL0Segments) + if needRefresh { + log.Info("Refresh compaction level zero views", + zap.Int64("collectionID", collID), + zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string { + return view.String() + }))) + + m.view.collections[collID] = latestL0Segments } - log.Info("Refresh compaction level zero views", - zap.Int64("collectionID", collID), - zap.Strings("views", lo.Map(changedL0Views, func(view CompactionView, _ int) string { - return view.String() - }))) - - m.view.collections[collID] = latestL0Segments - refreshedL0Views = append(refreshedL0Views, changedL0Views...) + if len(collRefreshedViews) > 0 { + allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...) + } } - return refreshedL0Views + return allRefreshedL0Veiws } -func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) []CompactionView { - latestViews := m.groupL0ViewsByPartChan(collID, LevelZeroViews) +func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) { cachedViews := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { return v.Level == datapb.SegmentLevel_L0 }) - var signals []CompactionView + if len(LevelZeroViews) == 0 && len(cachedViews) != 0 { + needRefresh = true + return + } + + latestViews := m.groupL0ViewsByPartChan(collID, LevelZeroViews) for _, latestView := range latestViews { views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool { return v.label.Equal(latestView.GetGroupLabel()) }) if !latestView.Equal(views) { - signals = append(signals, latestView) + refreshed = append(refreshed, latestView) + needRefresh = true } } - return signals + return } func (m *CompactionViewManager) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView { diff --git a/internal/datacoord/compaction_view_manager_test.go b/internal/datacoord/compaction_view_manager_test.go index f3fac3d1fc..c9709947ac 100644 --- a/internal/datacoord/compaction_view_manager_test.go +++ b/internal/datacoord/compaction_view_manager_test.go @@ -233,7 +233,7 @@ func (s *CompactionViewManagerSuite) TestNotifyTrigger() { func (s *CompactionViewManagerSuite) TestCheck() { // nothing in the view before the test ctx := context.Background() - s.Empty(s.m.view.collections) + s.Require().Empty(s.m.view.collections) events := s.m.Check(ctx) s.m.viewGuard.Lock() @@ -265,6 +265,48 @@ func (s *CompactionViewManagerSuite) TestCheck() { emptyEvents = s.m.Check(ctx) s.Empty(emptyEvents) s.Empty(s.m.view.collections) + + s.Run("check collection for zero l0 segments", func() { + s.SetupTest() + ctx := context.Background() + s.Require().Empty(s.m.view.collections) + events := s.m.Check(ctx) + + s.m.viewGuard.Lock() + views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) + s.m.viewGuard.Unlock() + s.Require().Equal(4, len(views)) + for _, view := range views { + s.EqualValues(s.testLabel, view.label) + s.Equal(datapb.SegmentLevel_L0, view.Level) + s.Equal(commonpb.SegmentState_Flushed, view.State) + log.Info("String", zap.String("segment", view.String())) + log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString())) + } + + s.NotEmpty(events) + s.Equal(1, len(events)) + refreshed, ok := events[TriggerTypeLevelZeroViewChange] + s.Require().True(ok) + s.Equal(1, len(refreshed)) + + // All l0 segments are dropped in the collection + // and there're still some L1 segments + s.m.meta.Lock() + s.m.meta.segments.segments = map[int64]*SegmentInfo{ + 2000: genTestSegmentInfo(s.testLabel, 2000, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), + 2001: genTestSegmentInfo(s.testLabel, 2001, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), + 2003: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), + 3000: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed), + } + s.m.meta.Unlock() + events = s.m.Check(ctx) + s.Empty(events) + s.m.viewGuard.Lock() + views = s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) + s.m.viewGuard.Unlock() + s.Equal(0, len(views)) + }) } func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {