mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Accidently exit the check loop (#34481)
See also: #34460 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
f7898a3ad6
commit
467113deab
@ -2,6 +2,7 @@ package datacoord
|
||||
|
||||
import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
@ -12,13 +13,14 @@ type l0CompactionPolicy struct {
|
||||
meta *meta
|
||||
view *FullViews
|
||||
|
||||
emptyLoopCount int
|
||||
emptyLoopCount *atomic.Int64
|
||||
}
|
||||
|
||||
func newL0CompactionPolicy(meta *meta, view *FullViews) *l0CompactionPolicy {
|
||||
return &l0CompactionPolicy{
|
||||
meta: meta,
|
||||
view: view,
|
||||
meta: meta,
|
||||
view: view,
|
||||
emptyLoopCount: atomic.NewInt64(0),
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,11 +33,16 @@ func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]Compact
|
||||
events := policy.generateEventForLevelZeroViewChange()
|
||||
if len(events) != 0 {
|
||||
// each time when triggers a compaction, the idleTicker would reset
|
||||
policy.emptyLoopCount = 0
|
||||
policy.emptyLoopCount.Store(0)
|
||||
return events, nil
|
||||
}
|
||||
if policy.emptyLoopCount >= 3 {
|
||||
policy.emptyLoopCount.Inc()
|
||||
|
||||
if policy.emptyLoopCount.Load() >= 3 {
|
||||
idleEvents := policy.generateEventForLevelZeroViewIDLE()
|
||||
if len(idleEvents) > 0 {
|
||||
policy.emptyLoopCount.Store(0)
|
||||
}
|
||||
return idleEvents, nil
|
||||
}
|
||||
return make(map[CompactionTriggerType][]CompactionView, 0), nil
|
||||
@ -129,13 +136,13 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
|
||||
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
|
||||
events := make(map[CompactionTriggerType][]CompactionView, 0)
|
||||
for collID := range policy.view.collections {
|
||||
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
|
||||
return v.Level == datapb.SegmentLevel_L0
|
||||
})
|
||||
if len(cachedViews) > 0 {
|
||||
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
|
||||
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
|
||||
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
|
||||
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
|
||||
|
||||
@ -19,18 +19,19 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
func TestCompactionViewManagerSuite(t *testing.T) {
|
||||
suite.Run(t, new(CompactionViewManagerSuite))
|
||||
func TestL0CompactionPolicySuite(t *testing.T) {
|
||||
suite.Run(t, new(L0CompactionPolicySuite))
|
||||
}
|
||||
|
||||
type CompactionViewManagerSuite struct {
|
||||
type L0CompactionPolicySuite struct {
|
||||
suite.Suite
|
||||
|
||||
mockAlloc *NMockAllocator
|
||||
@ -39,11 +40,109 @@ type CompactionViewManagerSuite struct {
|
||||
handler Handler
|
||||
mockPlanContext *MockCompactionPlanContext
|
||||
|
||||
m *CompactionTriggerManager
|
||||
l0_policy *l0CompactionPolicy
|
||||
}
|
||||
|
||||
const MB = 1024 * 1024
|
||||
|
||||
func (s *L0CompactionPolicySuite) TestTrigger() {
|
||||
s.Require().Empty(s.l0_policy.view.collections)
|
||||
|
||||
events, err := s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
gotViews, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
|
||||
cView := gotViews[0]
|
||||
s.Equal(s.testLabel, cView.GetGroupLabel())
|
||||
s.Equal(4, len(cView.GetSegmentsView()))
|
||||
for _, view := range cView.GetSegmentsView() {
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
log.Info("cView", zap.String("string", cView.String()))
|
||||
|
||||
// Test for idle trigger
|
||||
for i := 0; i < 2; i++ {
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.Equal(0, len(events))
|
||||
}
|
||||
s.EqualValues(2, s.l0_policy.emptyLoopCount.Load())
|
||||
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.EqualValues(0, s.l0_policy.emptyLoopCount.Load())
|
||||
s.Equal(1, len(events))
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
cView = gotViews[0]
|
||||
s.Equal(s.testLabel, cView.GetGroupLabel())
|
||||
s.Equal(4, len(cView.GetSegmentsView()))
|
||||
for _, view := range cView.GetSegmentsView() {
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
log.Info("cView", zap.String("string", cView.String()))
|
||||
|
||||
segArgs := []struct {
|
||||
ID UniqueID
|
||||
PosT Timestamp
|
||||
|
||||
LogSize int64
|
||||
LogCount int
|
||||
}{
|
||||
{500, 10000, 4 * MB, 1},
|
||||
{501, 10000, 4 * MB, 1},
|
||||
{502, 10000, 4 * MB, 1},
|
||||
{503, 50000, 4 * MB, 1},
|
||||
}
|
||||
|
||||
segments := make(map[int64]*SegmentInfo)
|
||||
for _, arg := range segArgs {
|
||||
info := genTestSegmentInfo(s.testLabel, arg.ID, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed)
|
||||
info.Deltalogs = genTestDeltalogs(arg.LogCount, arg.LogSize)
|
||||
info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
|
||||
segments[arg.ID] = info
|
||||
}
|
||||
meta := &meta{segments: NewSegmentsInfo()}
|
||||
for id, segment := range segments {
|
||||
meta.segments.SetSegment(id, segment)
|
||||
}
|
||||
s.l0_policy.meta = meta
|
||||
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
s.Equal(1, len(gotViews))
|
||||
}
|
||||
|
||||
func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() {
|
||||
s.Require().Empty(s.l0_policy.view.collections)
|
||||
|
||||
events := s.l0_policy.generateEventForLevelZeroViewChange()
|
||||
s.NotEmpty(events)
|
||||
s.NotEmpty(s.l0_policy.view.collections)
|
||||
|
||||
gotViews, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
|
||||
storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID]
|
||||
s.True(ok)
|
||||
s.NotNil(storedViews)
|
||||
s.Equal(4, len(storedViews))
|
||||
|
||||
for _, view := range storedViews {
|
||||
s.Equal(s.testLabel, view.label)
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
}
|
||||
|
||||
func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
|
||||
segArgs := []struct {
|
||||
ID UniqueID
|
||||
@ -81,12 +180,7 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
|
||||
return segments
|
||||
}
|
||||
|
||||
func (s *CompactionViewManagerSuite) SetupTest() {
|
||||
s.mockAlloc = NewNMockAllocator(s.T())
|
||||
s.mockTriggerManager = NewMockTriggerManager(s.T())
|
||||
s.handler = NewNMockHandler(s.T())
|
||||
s.mockPlanContext = NewMockCompactionPlanContext(s.T())
|
||||
|
||||
func (s *L0CompactionPolicySuite) SetupTest() {
|
||||
s.testLabel = &CompactionGroupLabel{
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
@ -99,231 +193,13 @@ func (s *CompactionViewManagerSuite) SetupTest() {
|
||||
meta.segments.SetSegment(id, segment)
|
||||
}
|
||||
|
||||
s.m = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, meta)
|
||||
views := &FullViews{
|
||||
collections: make(map[int64][]*SegmentView),
|
||||
}
|
||||
|
||||
s.l0_policy = newL0CompactionPolicy(meta, views)
|
||||
}
|
||||
|
||||
func (s *CompactionViewManagerSuite) TestCheckLoop() {
|
||||
s.Run("Test start and close", func() {
|
||||
s.m.Start()
|
||||
s.m.Close()
|
||||
})
|
||||
|
||||
s.Run("Test not enable auto compaction", func() {
|
||||
paramtable.Get().Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "false")
|
||||
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableAutoCompaction.Key)
|
||||
|
||||
s.m.Start()
|
||||
s.m.closeWg.Wait()
|
||||
})
|
||||
|
||||
s.Run("Test not enable levelZero segment", func() {
|
||||
paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "false")
|
||||
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
|
||||
s.m.Start()
|
||||
s.m.closeWg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
//func (s *CompactionViewManagerSuite) TestCheckLoopIDLETicker() {
|
||||
// paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1")
|
||||
// defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key)
|
||||
// paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true")
|
||||
// defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
//
|
||||
// events := s.m.Check(context.Background())
|
||||
// s.NotEmpty(events)
|
||||
// s.Require().NotEmpty(s.m.view.collections)
|
||||
//
|
||||
// notified := make(chan struct{})
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once()
|
||||
// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
|
||||
// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
|
||||
// s.Equal(TriggerTypeLevelZeroViewIDLE, tType)
|
||||
// v, ok := views[0].(*LevelZeroSegmentsView)
|
||||
// s.True(ok)
|
||||
// s.NotNil(v)
|
||||
// log.Info("All views", zap.String("l0 view", v.String()))
|
||||
//
|
||||
// notified <- struct{}{}
|
||||
// }).Once()
|
||||
//
|
||||
// s.m.Start()
|
||||
// <-notified
|
||||
// s.m.Close()
|
||||
//}
|
||||
//
|
||||
//func (s *CompactionViewManagerSuite) TestCheckLoopRefreshViews() {
|
||||
// paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1")
|
||||
// defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key)
|
||||
// paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true")
|
||||
// defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
//
|
||||
// s.Require().Empty(s.m.view.collections)
|
||||
//
|
||||
// notified := make(chan struct{})
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once()
|
||||
// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
|
||||
// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
|
||||
// s.Equal(TriggerTypeLevelZeroViewChange, tType)
|
||||
// v, ok := views[0].(*LevelZeroSegmentsView)
|
||||
// s.True(ok)
|
||||
// s.NotNil(v)
|
||||
// log.Info("All views", zap.String("l0 view", v.String()))
|
||||
//
|
||||
// notified <- struct{}{}
|
||||
// }).Once()
|
||||
//
|
||||
// s.m.Start()
|
||||
// <-notified
|
||||
//
|
||||
// // clear view
|
||||
// s.m.viewGuard.Lock()
|
||||
// s.m.view.collections = make(map[int64][]*SegmentView)
|
||||
// s.m.viewGuard.Unlock()
|
||||
//
|
||||
// // clear meta
|
||||
// s.m.meta.Lock()
|
||||
// s.m.meta.segments.segments = make(map[int64]*SegmentInfo)
|
||||
// s.m.meta.Unlock()
|
||||
//
|
||||
// <-time.After(time.Second)
|
||||
// s.m.Close()
|
||||
//}
|
||||
//
|
||||
//func (s *CompactionViewManagerSuite) TestTriggerEventForIDLEView() {
|
||||
// s.Require().Empty(s.m.view.collections)
|
||||
// s.m.triggerEventForIDLEView()
|
||||
//
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once()
|
||||
// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
|
||||
// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
|
||||
// s.EqualValues(1, taskID)
|
||||
// s.Equal(TriggerTypeLevelZeroViewIDLE, tType)
|
||||
// s.Equal(1, len(views))
|
||||
// v, ok := views[0].(*LevelZeroSegmentsView)
|
||||
// s.True(ok)
|
||||
// s.NotNil(v)
|
||||
//
|
||||
// expectedSegs := []int64{100, 101, 102, 103}
|
||||
// gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID })
|
||||
// s.ElementsMatch(expectedSegs, gotSegs)
|
||||
//
|
||||
// s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp())
|
||||
// log.Info("All views", zap.String("l0 view", v.String()))
|
||||
// }).Once()
|
||||
//
|
||||
// events := s.m.Check(context.Background())
|
||||
// s.NotEmpty(events)
|
||||
// s.Require().NotEmpty(s.m.view.collections)
|
||||
// s.m.triggerEventForIDLEView()
|
||||
//}
|
||||
//
|
||||
//func (s *CompactionViewManagerSuite) TestNotifyTrigger() {
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once()
|
||||
// s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
|
||||
// Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
|
||||
// s.EqualValues(1, taskID)
|
||||
// s.Equal(TriggerTypeLevelZeroViewChange, tType)
|
||||
// s.Equal(1, len(views))
|
||||
// v, ok := views[0].(*LevelZeroSegmentsView)
|
||||
// s.True(ok)
|
||||
// s.NotNil(v)
|
||||
//
|
||||
// expectedSegs := []int64{100, 101, 102, 103}
|
||||
// gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID })
|
||||
// s.ElementsMatch(expectedSegs, gotSegs)
|
||||
//
|
||||
// s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp())
|
||||
// log.Info("All views", zap.String("l0 view", v.String()))
|
||||
// }).Once()
|
||||
//
|
||||
// ctx := context.Background()
|
||||
// s.Require().Empty(s.m.view.collections)
|
||||
// events := s.m.Check(ctx)
|
||||
//
|
||||
// s.m.notifyTrigger(ctx, events)
|
||||
//}
|
||||
//
|
||||
//func (s *CompactionViewManagerSuite) TestCheck() {
|
||||
// // nothing in the view before the test
|
||||
// ctx := context.Background()
|
||||
// s.Require().Empty(s.m.view.collections)
|
||||
// events := s.m.Check(ctx)
|
||||
//
|
||||
// s.m.viewGuard.Lock()
|
||||
// views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
|
||||
// s.m.viewGuard.Unlock()
|
||||
// s.Equal(4, len(views))
|
||||
// for _, view := range views {
|
||||
// s.EqualValues(s.testLabel, view.label)
|
||||
// s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
// s.Equal(commonpb.SegmentState_Flushed, view.State)
|
||||
// log.Info("String", zap.String("segment", view.String()))
|
||||
// log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
|
||||
// }
|
||||
//
|
||||
// s.NotEmpty(events)
|
||||
// s.Equal(1, len(events))
|
||||
// refreshed, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
// s.Require().True(ok)
|
||||
// s.Equal(1, len(refreshed))
|
||||
//
|
||||
// // same meta
|
||||
// emptyEvents := s.m.Check(ctx)
|
||||
// s.Empty(emptyEvents)
|
||||
//
|
||||
// // clear meta
|
||||
// s.m.meta.Lock()
|
||||
// s.m.meta.segments.segments = make(map[int64]*SegmentInfo)
|
||||
// s.m.meta.Unlock()
|
||||
// emptyEvents = s.m.Check(ctx)
|
||||
// s.Empty(emptyEvents)
|
||||
// s.Empty(s.m.view.collections)
|
||||
//
|
||||
// s.Run("check collection for zero l0 segments", func() {
|
||||
// s.SetupTest()
|
||||
// ctx := context.Background()
|
||||
// s.Require().Empty(s.m.view.collections)
|
||||
// events := s.m.Check(ctx)
|
||||
//
|
||||
// s.m.viewGuard.Lock()
|
||||
// views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
|
||||
// s.m.viewGuard.Unlock()
|
||||
// s.Require().Equal(4, len(views))
|
||||
// for _, view := range views {
|
||||
// s.EqualValues(s.testLabel, view.label)
|
||||
// s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
// s.Equal(commonpb.SegmentState_Flushed, view.State)
|
||||
// log.Info("String", zap.String("segment", view.String()))
|
||||
// log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
|
||||
// }
|
||||
//
|
||||
// s.NotEmpty(events)
|
||||
// s.Equal(1, len(events))
|
||||
// refreshed, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
// s.Require().True(ok)
|
||||
// s.Equal(1, len(refreshed))
|
||||
//
|
||||
// // All l0 segments are dropped in the collection
|
||||
// // and there're still some L1 segments
|
||||
// s.m.meta.Lock()
|
||||
// s.m.meta.segments.segments = map[int64]*SegmentInfo{
|
||||
// 2000: genTestSegmentInfo(s.testLabel, 2000, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
// 2001: genTestSegmentInfo(s.testLabel, 2001, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
// 2003: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L0, commonpb.SegmentState_Dropped),
|
||||
// 3000: genTestSegmentInfo(s.testLabel, 2003, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed),
|
||||
// }
|
||||
// s.m.meta.Unlock()
|
||||
// events = s.m.Check(ctx)
|
||||
// s.Empty(events)
|
||||
// s.m.viewGuard.Lock()
|
||||
// views = s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
|
||||
// s.m.viewGuard.Unlock()
|
||||
// s.Equal(0, len(views))
|
||||
// })
|
||||
//}
|
||||
|
||||
func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
||||
@ -106,6 +106,7 @@ func (m *CompactionTriggerManager) startLoop() {
|
||||
defer l0Ticker.Stop()
|
||||
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
|
||||
defer clusteringTicker.Stop()
|
||||
log.Info("Compaction trigger manager start")
|
||||
for {
|
||||
select {
|
||||
case <-m.closeSig:
|
||||
@ -117,11 +118,11 @@ func (m *CompactionTriggerManager) startLoop() {
|
||||
}
|
||||
if m.compactionHandler.isFull() {
|
||||
log.RatedInfo(10, "Skip trigger l0 compaction since compactionHandler is full")
|
||||
return
|
||||
continue
|
||||
}
|
||||
events, err := m.l0Policy.Trigger()
|
||||
if err != nil {
|
||||
log.Warn("Fail to trigger policy", zap.Error(err))
|
||||
log.Warn("Fail to trigger L0 policy", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ctx := context.Background()
|
||||
@ -136,11 +137,11 @@ func (m *CompactionTriggerManager) startLoop() {
|
||||
}
|
||||
if m.compactionHandler.isFull() {
|
||||
log.RatedInfo(10, "Skip trigger l0 compaction since compactionHandler is full")
|
||||
return
|
||||
continue
|
||||
}
|
||||
events, err := m.clusteringPolicy.Trigger()
|
||||
if err != nil {
|
||||
log.Warn("Fail to trigger policy", zap.Error(err))
|
||||
log.Warn("Fail to trigger clustering policy", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user