diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 91b25ae231..1f15a40386 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -2,6 +2,7 @@ package datacoord import ( "github.com/samber/lo" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -12,13 +13,14 @@ type l0CompactionPolicy struct { meta *meta view *FullViews - emptyLoopCount int + emptyLoopCount *atomic.Int64 } func newL0CompactionPolicy(meta *meta, view *FullViews) *l0CompactionPolicy { return &l0CompactionPolicy{ - meta: meta, - view: view, + meta: meta, + view: view, + emptyLoopCount: atomic.NewInt64(0), } } @@ -31,11 +33,16 @@ func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]Compact events := policy.generateEventForLevelZeroViewChange() if len(events) != 0 { // each time when triggers a compaction, the idleTicker would reset - policy.emptyLoopCount = 0 + policy.emptyLoopCount.Store(0) return events, nil } - if policy.emptyLoopCount >= 3 { + policy.emptyLoopCount.Inc() + + if policy.emptyLoopCount.Load() >= 3 { idleEvents := policy.generateEventForLevelZeroViewIDLE() + if len(idleEvents) > 0 { + policy.emptyLoopCount.Store(0) + } return idleEvents, nil } return make(map[CompactionTriggerType][]CompactionView, 0), nil @@ -129,13 +136,13 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, } func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView { - log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") events := make(map[CompactionTriggerType][]CompactionView, 0) for collID := range policy.view.collections { cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { return v.Level == datapb.SegmentLevel_L0 }) if len(cachedViews) > 0 { + log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") grouped := policy.groupL0ViewsByPartChan(collID, cachedViews) events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped), func(l0View *LevelZeroSegmentsView, _ int) CompactionView { diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 37635e659d..29348added 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -19,18 +19,19 @@ import ( "testing" "github.com/stretchr/testify/suite" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/log" ) -func TestCompactionViewManagerSuite(t *testing.T) { - suite.Run(t, new(CompactionViewManagerSuite)) +func TestL0CompactionPolicySuite(t *testing.T) { + suite.Run(t, new(L0CompactionPolicySuite)) } -type CompactionViewManagerSuite struct { +type L0CompactionPolicySuite struct { suite.Suite mockAlloc *NMockAllocator @@ -39,11 +40,109 @@ type CompactionViewManagerSuite struct { handler Handler mockPlanContext *MockCompactionPlanContext - m *CompactionTriggerManager + l0_policy *l0CompactionPolicy } const MB = 1024 * 1024 +func (s *L0CompactionPolicySuite) TestTrigger() { + s.Require().Empty(s.l0_policy.view.collections) + + events, err := s.l0_policy.Trigger() + s.NoError(err) + gotViews, ok := events[TriggerTypeLevelZeroViewChange] + s.True(ok) + s.NotNil(gotViews) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + s.Equal(s.testLabel, cView.GetGroupLabel()) + s.Equal(4, len(cView.GetSegmentsView())) + for _, view := range cView.GetSegmentsView() { + s.Equal(datapb.SegmentLevel_L0, view.Level) + } + log.Info("cView", zap.String("string", cView.String())) + + // Test for idle trigger + for i := 0; i < 2; i++ { + events, err = s.l0_policy.Trigger() + s.NoError(err) + s.Equal(0, len(events)) + } + s.EqualValues(2, s.l0_policy.emptyLoopCount.Load()) + + events, err = s.l0_policy.Trigger() + s.NoError(err) + s.EqualValues(0, s.l0_policy.emptyLoopCount.Load()) + s.Equal(1, len(events)) + gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.NotNil(gotViews) + s.Equal(1, len(gotViews)) + cView = gotViews[0] + s.Equal(s.testLabel, cView.GetGroupLabel()) + s.Equal(4, len(cView.GetSegmentsView())) + for _, view := range cView.GetSegmentsView() { + s.Equal(datapb.SegmentLevel_L0, view.Level) + } + log.Info("cView", zap.String("string", cView.String())) + + segArgs := []struct { + ID UniqueID + PosT Timestamp + + LogSize int64 + LogCount int + }{ + {500, 10000, 4 * MB, 1}, + {501, 10000, 4 * MB, 1}, + {502, 10000, 4 * MB, 1}, + {503, 50000, 4 * MB, 1}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed) + info.Deltalogs = genTestDeltalogs(arg.LogCount, arg.LogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + segments[arg.ID] = info + } + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + + events, err = s.l0_policy.Trigger() + s.NoError(err) + gotViews, ok = events[TriggerTypeLevelZeroViewChange] + s.True(ok) + s.Equal(1, len(gotViews)) +} + +func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() { + s.Require().Empty(s.l0_policy.view.collections) + + events := s.l0_policy.generateEventForLevelZeroViewChange() + s.NotEmpty(events) + s.NotEmpty(s.l0_policy.view.collections) + + gotViews, ok := events[TriggerTypeLevelZeroViewChange] + s.True(ok) + s.NotNil(gotViews) + s.Equal(1, len(gotViews)) + + storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID] + s.True(ok) + s.NotNil(storedViews) + s.Equal(4, len(storedViews)) + + for _, view := range storedViews { + s.Equal(s.testLabel, view.label) + s.Equal(datapb.SegmentLevel_L0, view.Level) + } +} + func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { segArgs := []struct { ID UniqueID @@ -81,12 +180,7 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { return segments } -func (s *CompactionViewManagerSuite) SetupTest() { - s.mockAlloc = NewNMockAllocator(s.T()) - s.mockTriggerManager = NewMockTriggerManager(s.T()) - s.handler = NewNMockHandler(s.T()) - s.mockPlanContext = NewMockCompactionPlanContext(s.T()) - +func (s *L0CompactionPolicySuite) SetupTest() { s.testLabel = &CompactionGroupLabel{ CollectionID: 1, PartitionID: 10, @@ -99,231 +193,13 @@ func (s *CompactionViewManagerSuite) SetupTest() { meta.segments.SetSegment(id, segment) } - s.m = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, meta) + views := &FullViews{ + collections: make(map[int64][]*SegmentView), + } + + s.l0_policy = newL0CompactionPolicy(meta, views) } -func (s *CompactionViewManagerSuite) TestCheckLoop() { - s.Run("Test start and close", func() { - s.m.Start() - s.m.Close() - }) - - s.Run("Test not enable auto compaction", func() { - paramtable.Get().Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "false") - defer paramtable.Get().Reset(Params.DataCoordCfg.EnableAutoCompaction.Key) - - s.m.Start() - s.m.closeWg.Wait() - }) - - s.Run("Test not enable levelZero segment", func() { - paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "false") - defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) - - s.m.Start() - s.m.closeWg.Wait() - }) -} - -//func (s *CompactionViewManagerSuite) TestCheckLoopIDLETicker() { -// paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1") -// defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key) -// paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true") -// defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) -// -// events := s.m.Check(context.Background()) -// s.NotEmpty(events) -// s.Require().NotEmpty(s.m.view.collections) -// -// notified := make(chan struct{}) -// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() -// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). -// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { -// s.Equal(TriggerTypeLevelZeroViewIDLE, tType) -// v, ok := views[0].(*LevelZeroSegmentsView) -// s.True(ok) -// s.NotNil(v) -// log.Info("All views", zap.String("l0 view", v.String())) -// -// notified <- struct{}{} -// }).Once() -// -// s.m.Start() -// <-notified -// s.m.Close() -//} -// -//func (s *CompactionViewManagerSuite) TestCheckLoopRefreshViews() { -// paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1") -// defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key) -// paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true") -// defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) -// -// s.Require().Empty(s.m.view.collections) -// -// notified := make(chan struct{}) -// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() -// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). -// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { -// s.Equal(TriggerTypeLevelZeroViewChange, tType) -// v, ok := views[0].(*LevelZeroSegmentsView) -// s.True(ok) -// s.NotNil(v) -// log.Info("All views", zap.String("l0 view", v.String())) -// -// notified <- struct{}{} -// }).Once() -// -// s.m.Start() -// <-notified -// -// // clear view -// s.m.viewGuard.Lock() -// s.m.view.collections = make(map[int64][]*SegmentView) -// s.m.viewGuard.Unlock() -// -// // clear meta -// s.m.meta.Lock() -// s.m.meta.segments.segments = make(map[int64]*SegmentInfo) -// s.m.meta.Unlock() -// -// <-time.After(time.Second) -// s.m.Close() -//} -// -//func (s *CompactionViewManagerSuite) TestTriggerEventForIDLEView() { -// s.Require().Empty(s.m.view.collections) -// s.m.triggerEventForIDLEView() -// -// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() -// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). -// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { -// s.EqualValues(1, taskID) -// s.Equal(TriggerTypeLevelZeroViewIDLE, tType) -// s.Equal(1, len(views)) -// v, ok := views[0].(*LevelZeroSegmentsView) -// s.True(ok) -// s.NotNil(v) -// -// expectedSegs := []int64{100, 101, 102, 103} -// gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID }) -// s.ElementsMatch(expectedSegs, gotSegs) -// -// s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp()) -// log.Info("All views", zap.String("l0 view", v.String())) -// }).Once() -// -// events := s.m.Check(context.Background()) -// s.NotEmpty(events) -// s.Require().NotEmpty(s.m.view.collections) -// s.m.triggerEventForIDLEView() -//} -// -//func (s *CompactionViewManagerSuite) TestNotifyTrigger() { -// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() -// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). -// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { -// s.EqualValues(1, taskID) -// s.Equal(TriggerTypeLevelZeroViewChange, tType) -// s.Equal(1, len(views)) -// v, ok := views[0].(*LevelZeroSegmentsView) -// s.True(ok) -// s.NotNil(v) -// -// expectedSegs := []int64{100, 101, 102, 103} -// gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID }) -// s.ElementsMatch(expectedSegs, gotSegs) -// -// s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp()) -// log.Info("All views", zap.String("l0 view", v.String())) -// }).Once() -// -// ctx := context.Background() -// s.Require().Empty(s.m.view.collections) -// events := s.m.Check(ctx) -// -// s.m.notifyTrigger(ctx, events) -//} -// -//func (s *CompactionViewManagerSuite) TestCheck() { -// // nothing in the view before the test -// ctx := context.Background() -// s.Require().Empty(s.m.view.collections) -// events := s.m.Check(ctx) -// -// s.m.viewGuard.Lock() -// views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) -// s.m.viewGuard.Unlock() -// s.Equal(4, len(views)) -// for _, view := range views { -// s.EqualValues(s.testLabel, view.label) -// s.Equal(datapb.SegmentLevel_L0, view.Level) -// s.Equal(commonpb.SegmentState_Flushed, view.State) -// log.Info("String", zap.String("segment", view.String())) -// log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString())) -// } -// -// s.NotEmpty(events) -// s.Equal(1, len(events)) -// refreshed, ok := events[TriggerTypeLevelZeroViewChange] -// s.Require().True(ok) -// s.Equal(1, len(refreshed)) -// -// // same meta -// emptyEvents := s.m.Check(ctx) -// s.Empty(emptyEvents) -// -// // clear meta -// s.m.meta.Lock() -// s.m.meta.segments.segments = make(map[int64]*SegmentInfo) -// s.m.meta.Unlock() -// emptyEvents = s.m.Check(ctx) -// s.Empty(emptyEvents) -// s.Empty(s.m.view.collections) -// -// s.Run("check collection for zero l0 segments", func() { -// s.SetupTest() -// ctx := context.Background() -// s.Require().Empty(s.m.view.collections) -// events := s.m.Check(ctx) -// -// s.m.viewGuard.Lock() -// views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) -// s.m.viewGuard.Unlock() -// s.Require().Equal(4, len(views)) -// for _, view := range views { -// s.EqualValues(s.testLabel, view.label) -// s.Equal(datapb.SegmentLevel_L0, view.Level) -// s.Equal(commonpb.SegmentState_Flushed, view.State) -// log.Info("String", zap.String("segment", view.String())) -// log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString())) -// } -// -// s.NotEmpty(events) -// s.Equal(1, len(events)) -// refreshed, ok := events[TriggerTypeLevelZeroViewChange] -// s.Require().True(ok) -// s.Equal(1, len(refreshed)) -// -// // All l0 segments are dropped in the collection -// // and there're still some L1 segments -// s.m.meta.Lock() -// s.m.meta.segments.segments = map[int64]*SegmentInfo{ -// 2000: genTestSegmentInfo(s.testLabel, 2000, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), -// 2001: genTestSegmentInfo(s.testLabel, 2001, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), -// 2003: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped), -// 3000: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed), -// } -// s.m.meta.Unlock() -// events = s.m.Check(ctx) -// s.Empty(events) -// s.m.viewGuard.Lock() -// views = s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) -// s.m.viewGuard.Unlock() -// s.Equal(0, len(views)) -// }) -//} - func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo { return &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 358ab2b596..76a01509e9 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -106,6 +106,7 @@ func (m *CompactionTriggerManager) startLoop() { defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) defer clusteringTicker.Stop() + log.Info("Compaction trigger manager start") for { select { case <-m.closeSig: @@ -117,11 +118,11 @@ func (m *CompactionTriggerManager) startLoop() { } if m.compactionHandler.isFull() { log.RatedInfo(10, "Skip trigger l0 compaction since compactionHandler is full") - return + continue } events, err := m.l0Policy.Trigger() if err != nil { - log.Warn("Fail to trigger policy", zap.Error(err)) + log.Warn("Fail to trigger L0 policy", zap.Error(err)) continue } ctx := context.Background() @@ -136,11 +137,11 @@ func (m *CompactionTriggerManager) startLoop() { } if m.compactionHandler.isFull() { log.RatedInfo(10, "Skip trigger l0 compaction since compactionHandler is full") - return + continue } events, err := m.clusteringPolicy.Trigger() if err != nil { - log.Warn("Fail to trigger policy", zap.Error(err)) + log.Warn("Fail to trigger clustering policy", zap.Error(err)) continue } ctx := context.Background()