From fc0d007bd18170c35c0abe8d552b58c8616b1b9d Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 31 Jan 2024 19:03:04 +0800 Subject: [PATCH] enhance: Add `MemoryHighSyncPolicy` back to write buffer manager (#29997) See also #27675 This PR adds back MemoryHighSyncPolicy implementation. Also change MinSegmentSize & CheckInterval to configurable param item. --------- Signed-off-by: Congqi Xia --- internal/datanode/data_node.go | 6 + internal/datanode/services.go | 2 +- internal/datanode/writebuffer/manager.go | 98 ++++++++++- internal/datanode/writebuffer/manager_test.go | 52 +++++- .../datanode/writebuffer/mock_mananger.go | 152 +++++++++++++----- .../datanode/writebuffer/mock_write_buffer.go | 133 ++++++++++++--- .../datanode/writebuffer/segment_buffer.go | 9 ++ internal/datanode/writebuffer/sync_policy.go | 38 +++++ .../datanode/writebuffer/sync_policy_test.go | 44 +++++ internal/datanode/writebuffer/write_buffer.go | 50 +++++- .../datanode/writebuffer/write_buffer_test.go | 69 +++++++- pkg/util/paramtable/component_param.go | 10 ++ 12 files changed, 579 insertions(+), 84 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 2f00f8c0a4..38cab0a023 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -372,6 +372,8 @@ func (node *DataNode) Start() error { return } + node.writeBufferManager.Start() + node.stopWaiter.Add(1) go node.BackGroundGC(node.clearSignal) @@ -426,6 +428,10 @@ func (node *DataNode) Stop() error { return true }) + if node.writeBufferManager != nil { + node.writeBufferManager.Stop() + } + if node.allocator != nil { log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) node.allocator.Close() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5499aa594c..e6f49be9c3 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -122,7 +122,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen log.Info("receiving FlushSegments request") - err := node.writeBufferManager.FlushSegments(ctx, req.GetChannelName(), segmentIDs) + err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), segmentIDs) if err != nil { log.Warn("failed to flush segments", zap.Error(err)) return merr.Status(err), nil diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index cbc3f8ada2..60dc7d9343 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -3,6 +3,7 @@ package writebuffer import ( "context" "sync" + "time" "go.uber.org/zap" @@ -11,16 +12,20 @@ import ( "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // BufferManager is the interface for WriteBuffer management. type BufferManager interface { // Register adds a WriteBuffer with provided schema & options. Register(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, opts ...WriteBufferOption) error - // FlushSegments notifies writeBuffer corresponding to provided channel to flush segments. - FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error - // FlushChannel + // SealSegments notifies writeBuffer corresponding to provided channel to seal segments. + // which will cause segment start flush procedure. + SealSegments(ctx context.Context, channel string, segmentIDs []int64) error + // FlushChannel set the flushTs of the provided write buffer. FlushChannel(ctx context.Context, channel string, flushTs uint64) error // RemoveChannel removes a write buffer from manager. RemoveChannel(channel string) @@ -32,6 +37,11 @@ type BufferManager interface { GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) // NotifyCheckpointUpdated notify write buffer checkpoint updated to reset flushTs. NotifyCheckpointUpdated(channel string, ts uint64) + + // Start makes the background check start to work. + Start() + // Stop the background checker and wait for worker goroutine quit. + Stop() } // NewManager returns initialized manager as `Manager` @@ -39,6 +49,8 @@ func NewManager(syncMgr syncmgr.SyncManager) BufferManager { return &bufferManager{ syncMgr: syncMgr, buffers: make(map[string]WriteBuffer), + + ch: lifetime.NewSafeChan(), } } @@ -46,6 +58,80 @@ type bufferManager struct { syncMgr syncmgr.SyncManager buffers map[string]WriteBuffer mut sync.RWMutex + + wg sync.WaitGroup + ch lifetime.SafeChan +} + +func (m *bufferManager) Start() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.check() + }() +} + +func (m *bufferManager) check() { + ticker := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) + defer ticker.Stop() + for { + select { + case <-ticker.C: + m.memoryCheck() + ticker.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) + case <-m.ch.CloseCh(): + log.Info("buffer manager memory check stopped") + return + } + } +} + +// memoryCheck performs check based on current memory usage & configuration. +func (m *bufferManager) memoryCheck() { + if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() { + return + } + + m.mut.Lock() + defer m.mut.Unlock() + + var total int64 + var candidate WriteBuffer + var candiSize int64 + var candiChan string + for chanName, buf := range m.buffers { + size := buf.MemorySize() + total += size + if size > candiSize { + candiSize = size + candidate = buf + candiChan = chanName + } + } + + toMB := func(mem float64) float64 { + return mem / 1024 / 1024 + } + + totalMemory := hardware.GetMemoryCount() + memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryWatermark.GetAsFloat() + if float64(total) < memoryWatermark { + log.RatedDebug(20, "skip force sync because memory level is not high enough", + zap.Float64("current_total_memory_usage", toMB(float64(total))), + zap.Float64("current_memory_watermark", toMB(memoryWatermark))) + return + } + + if candidate != nil { + candidate.EvictBuffer(GetOldestBufferPolicy(paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt())) + log.Info("notify writebuffer to sync", + zap.String("channel", candiChan), zap.Float64("bufferSize(MB)", toMB(float64(candiSize)))) + } +} + +func (m *bufferManager) Stop() { + m.ch.Close() + m.wg.Wait() } // Register a new WriteBuffer for channel. @@ -65,8 +151,8 @@ func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, return nil } -// FlushSegments call sync segment and change segments state to Flushed. -func (m *bufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { +// SealSegments call sync segment and change segments state to Flushed. +func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { m.mut.RLock() buf, ok := m.buffers[channel] m.mut.RUnlock() @@ -78,7 +164,7 @@ func (m *bufferManager) FlushSegments(ctx context.Context, channel string, segme return merr.WrapErrChannelNotFound(channel) } - return buf.FlushSegments(ctx, segmentIDs) + return buf.SealSegments(ctx, segmentIDs) } func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 89f05f25e5..4338411aa3 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -87,7 +88,7 @@ func (s *ManagerSuite) TestFlushSegments() { s.Run("channel_not_found", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := manager.FlushSegments(ctx, s.channelName, []int64{1, 2, 3}) + err := manager.SealSegments(ctx, s.channelName, []int64{1, 2, 3}) s.Error(err, "FlushSegments shall return error when channel not found") }) @@ -101,9 +102,9 @@ func (s *ManagerSuite) TestFlushSegments() { s.manager.buffers[s.channelName] = wb s.manager.mut.Unlock() - wb.EXPECT().FlushSegments(mock.Anything, mock.Anything).Return(nil) + wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil) - err := manager.FlushSegments(ctx, s.channelName, []int64{1}) + err := manager.SealSegments(ctx, s.channelName, []int64{1}) s.NoError(err) }) } @@ -192,6 +193,51 @@ func (s *ManagerSuite) TestRemoveChannel() { }) } +func (s *ManagerSuite) TestMemoryCheck() { + manager := s.manager + param := paramtable.Get() + + param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50") + param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "false") + param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.7") + + defer func() { + param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key) + param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key) + param.Reset(param.DataNodeCfg.MemoryWatermark.Key) + }() + + wb := NewMockWriteBuffer(s.T()) + + memoryLimit := hardware.GetMemoryCount() + signal := make(chan struct{}, 1) + wb.EXPECT().MemorySize().Return(int64(float64(memoryLimit) * 0.6)) + wb.EXPECT().EvictBuffer(mock.Anything).Run(func(polices ...SyncPolicy) { + select { + case signal <- struct{}{}: + default: + } + }).Return() + manager.mut.Lock() + manager.buffers[s.channelName] = wb + manager.mut.Unlock() + + manager.Start() + defer manager.Stop() + + <-time.After(time.Millisecond * 100) + wb.AssertNotCalled(s.T(), "MemorySize") + + param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "true") + + <-time.After(time.Millisecond * 100) + wb.AssertNotCalled(s.T(), "SetMemoryHighFlag") + param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.5") + + <-signal + wb.AssertExpectations(s.T()) +} + func TestManager(t *testing.T) { suite.Run(t, new(ManagerSuite)) } diff --git a/internal/datanode/writebuffer/mock_mananger.go b/internal/datanode/writebuffer/mock_mananger.go index ac7a501f98..084b4bec77 100644 --- a/internal/datanode/writebuffer/mock_mananger.go +++ b/internal/datanode/writebuffer/mock_mananger.go @@ -149,50 +149,6 @@ func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Con return _c } -// FlushSegments provides a mock function with given fields: ctx, channel, segmentIDs -func (_m *MockBufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { - ret := _m.Called(ctx, channel, segmentIDs) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok { - r0 = rf(ctx, channel, segmentIDs) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockBufferManager_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments' -type MockBufferManager_FlushSegments_Call struct { - *mock.Call -} - -// FlushSegments is a helper method to define mock.On call -// - ctx context.Context -// - channel string -// - segmentIDs []int64 -func (_e *MockBufferManager_Expecter) FlushSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockBufferManager_FlushSegments_Call { - return &MockBufferManager_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, channel, segmentIDs)} -} - -func (_c *MockBufferManager_FlushSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockBufferManager_FlushSegments_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].([]int64)) - }) - return _c -} - -func (_c *MockBufferManager_FlushSegments_Call) Return(_a0 error) *MockBufferManager_FlushSegments_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockBufferManager_FlushSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockBufferManager_FlushSegments_Call { - _c.Call.Return(run) - return _c -} - // GetCheckpoint provides a mock function with given fields: channel func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { ret := _m.Called(channel) @@ -380,6 +336,114 @@ func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) * return _c } +// SealSegments provides a mock function with given fields: ctx, channel, segmentIDs +func (_m *MockBufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { + ret := _m.Called(ctx, channel, segmentIDs) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok { + r0 = rf(ctx, channel, segmentIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockBufferManager_SealSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealSegments' +type MockBufferManager_SealSegments_Call struct { + *mock.Call +} + +// SealSegments is a helper method to define mock.On call +// - ctx context.Context +// - channel string +// - segmentIDs []int64 +func (_e *MockBufferManager_Expecter) SealSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockBufferManager_SealSegments_Call { + return &MockBufferManager_SealSegments_Call{Call: _e.mock.On("SealSegments", ctx, channel, segmentIDs)} +} + +func (_c *MockBufferManager_SealSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockBufferManager_SealSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].([]int64)) + }) + return _c +} + +func (_c *MockBufferManager_SealSegments_Call) Return(_a0 error) *MockBufferManager_SealSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBufferManager_SealSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockBufferManager_SealSegments_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockBufferManager) Start() { + _m.Called() +} + +// MockBufferManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockBufferManager_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockBufferManager_Expecter) Start() *MockBufferManager_Start_Call { + return &MockBufferManager_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockBufferManager_Start_Call) Run(run func()) *MockBufferManager_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockBufferManager_Start_Call) Return() *MockBufferManager_Start_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBufferManager_Start_Call) RunAndReturn(run func()) *MockBufferManager_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockBufferManager) Stop() { + _m.Called() +} + +// MockBufferManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockBufferManager_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockBufferManager_Expecter) Stop() *MockBufferManager_Stop_Call { + return &MockBufferManager_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockBufferManager_Stop_Call) Run(run func()) *MockBufferManager_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockBufferManager_Stop_Call) Return() *MockBufferManager_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBufferManager_Stop_Call) RunAndReturn(run func()) *MockBufferManager_Stop_Call { + _c.Call.Return(run) + return _c +} + // NewMockBufferManager creates a new instance of MockBufferManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBufferManager(t interface { diff --git a/internal/datanode/writebuffer/mock_write_buffer.go b/internal/datanode/writebuffer/mock_write_buffer.go index 88e1201876..2273a01396 100644 --- a/internal/datanode/writebuffer/mock_write_buffer.go +++ b/internal/datanode/writebuffer/mock_write_buffer.go @@ -102,45 +102,48 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(bool)) *MockWriteBuf return _c } -// FlushSegments provides a mock function with given fields: ctx, segmentIDs -func (_m *MockWriteBuffer) FlushSegments(ctx context.Context, segmentIDs []int64) error { - ret := _m.Called(ctx, segmentIDs) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { - r0 = rf(ctx, segmentIDs) - } else { - r0 = ret.Error(0) +// EvictBuffer provides a mock function with given fields: policies +func (_m *MockWriteBuffer) EvictBuffer(policies ...SyncPolicy) { + _va := make([]interface{}, len(policies)) + for _i := range policies { + _va[_i] = policies[_i] } - - return r0 + var _ca []interface{} + _ca = append(_ca, _va...) + _m.Called(_ca...) } -// MockWriteBuffer_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments' -type MockWriteBuffer_FlushSegments_Call struct { +// MockWriteBuffer_EvictBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EvictBuffer' +type MockWriteBuffer_EvictBuffer_Call struct { *mock.Call } -// FlushSegments is a helper method to define mock.On call -// - ctx context.Context -// - segmentIDs []int64 -func (_e *MockWriteBuffer_Expecter) FlushSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_FlushSegments_Call { - return &MockWriteBuffer_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, segmentIDs)} +// EvictBuffer is a helper method to define mock.On call +// - policies ...SyncPolicy +func (_e *MockWriteBuffer_Expecter) EvictBuffer(policies ...interface{}) *MockWriteBuffer_EvictBuffer_Call { + return &MockWriteBuffer_EvictBuffer_Call{Call: _e.mock.On("EvictBuffer", + append([]interface{}{}, policies...)...)} } -func (_c *MockWriteBuffer_FlushSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_FlushSegments_Call { +func (_c *MockWriteBuffer_EvictBuffer_Call) Run(run func(policies ...SyncPolicy)) *MockWriteBuffer_EvictBuffer_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]int64)) + variadicArgs := make([]SyncPolicy, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(SyncPolicy) + } + } + run(variadicArgs...) }) return _c } -func (_c *MockWriteBuffer_FlushSegments_Call) Return(_a0 error) *MockWriteBuffer_FlushSegments_Call { - _c.Call.Return(_a0) +func (_c *MockWriteBuffer_EvictBuffer_Call) Return() *MockWriteBuffer_EvictBuffer_Call { + _c.Call.Return() return _c } -func (_c *MockWriteBuffer_FlushSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_FlushSegments_Call { +func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy)) *MockWriteBuffer_EvictBuffer_Call { _c.Call.Return(run) return _c } @@ -271,6 +274,90 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M return _c } +// MemorySize provides a mock function with given fields: +func (_m *MockWriteBuffer) MemorySize() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockWriteBuffer_MemorySize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MemorySize' +type MockWriteBuffer_MemorySize_Call struct { + *mock.Call +} + +// MemorySize is a helper method to define mock.On call +func (_e *MockWriteBuffer_Expecter) MemorySize() *MockWriteBuffer_MemorySize_Call { + return &MockWriteBuffer_MemorySize_Call{Call: _e.mock.On("MemorySize")} +} + +func (_c *MockWriteBuffer_MemorySize_Call) Run(run func()) *MockWriteBuffer_MemorySize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWriteBuffer_MemorySize_Call) Return(_a0 int64) *MockWriteBuffer_MemorySize_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_MemorySize_Call) RunAndReturn(run func() int64) *MockWriteBuffer_MemorySize_Call { + _c.Call.Return(run) + return _c +} + +// SealSegments provides a mock function with given fields: ctx, segmentIDs +func (_m *MockWriteBuffer) SealSegments(ctx context.Context, segmentIDs []int64) error { + ret := _m.Called(ctx, segmentIDs) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, segmentIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWriteBuffer_SealSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealSegments' +type MockWriteBuffer_SealSegments_Call struct { + *mock.Call +} + +// SealSegments is a helper method to define mock.On call +// - ctx context.Context +// - segmentIDs []int64 +func (_e *MockWriteBuffer_Expecter) SealSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_SealSegments_Call { + return &MockWriteBuffer_SealSegments_Call{Call: _e.mock.On("SealSegments", ctx, segmentIDs)} +} + +func (_c *MockWriteBuffer_SealSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_SealSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int64)) + }) + return _c +} + +func (_c *MockWriteBuffer_SealSegments_Call) Return(_a0 error) *MockWriteBuffer_SealSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_SealSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_SealSegments_Call { + _c.Call.Return(run) + return _c +} + // SetFlushTimestamp provides a mock function with given fields: flushTs func (_m *MockWriteBuffer) SetFlushTimestamp(flushTs uint64) { _m.Called(flushTs) diff --git a/internal/datanode/writebuffer/segment_buffer.go b/internal/datanode/writebuffer/segment_buffer.go index 9d80a6b39a..e6b625d45d 100644 --- a/internal/datanode/writebuffer/segment_buffer.go +++ b/internal/datanode/writebuffer/segment_buffer.go @@ -3,9 +3,12 @@ package writebuffer import ( "math" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -37,6 +40,7 @@ func (buf *segmentBuffer) Yield() (insert *storage.InsertData, delete *storage.D } func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp { + log.Info("segmentID", zap.Int64("segmentID", buf.segmentID)) insertTs := buf.insertBuffer.MinTimestamp() deltaTs := buf.deltaBuffer.MinTimestamp() @@ -65,6 +69,11 @@ func (buf *segmentBuffer) GetTimeRange() *TimeRange { return result } +// MemorySize returns total memory size of insert buffer & delta buffer. +func (buf *segmentBuffer) MemorySize() int64 { + return buf.insertBuffer.size + buf.deltaBuffer.size +} + // TimeRange is a range of timestamp contains the min-timestamp and max-timestamp type TimeRange struct { timestampMin typeutil.Timestamp diff --git a/internal/datanode/writebuffer/sync_policy.go b/internal/datanode/writebuffer/sync_policy.go index 36670c37fe..32d3fff980 100644 --- a/internal/datanode/writebuffer/sync_policy.go +++ b/internal/datanode/writebuffer/sync_policy.go @@ -1,6 +1,7 @@ package writebuffer import ( + "container/heap" "math/rand" "time" @@ -97,3 +98,40 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S return nil }, "flush ts") } + +func GetOldestBufferPolicy(num int) SyncPolicy { + return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { + h := &SegStartPosHeap{} + heap.Init(h) + + for _, buf := range buffers { + heap.Push(h, buf) + if h.Len() > num { + heap.Pop(h) + } + } + + return lo.Map(*h, func(buf *segmentBuffer, _ int) int64 { return buf.segmentID }) + }, "oldest buffers") +} + +// SegMemSizeHeap implement max-heap for sorting. +type SegStartPosHeap []*segmentBuffer + +func (h SegStartPosHeap) Len() int { return len(h) } +func (h SegStartPosHeap) Less(i, j int) bool { + return h[i].MinTimestamp() > h[j].MinTimestamp() +} +func (h SegStartPosHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *SegStartPosHeap) Push(x any) { + *h = append(*h, x.(*segmentBuffer)) +} + +func (h *SegStartPosHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/internal/datanode/writebuffer/sync_policy_test.go b/internal/datanode/writebuffer/sync_policy_test.go index 2eb6d4de28..e155145816 100644 --- a/internal/datanode/writebuffer/sync_policy_test.go +++ b/internal/datanode/writebuffer/sync_policy_test.go @@ -94,6 +94,50 @@ func (s *SyncPolicySuite) TestCompactedSegmentsPolicy() { s.ElementsMatch(ids, result) } +func (s *SyncPolicySuite) TestOlderBufferPolicy() { + policy := GetOldestBufferPolicy(2) + + type testCase struct { + tag string + buffers []*segmentBuffer + expect []int64 + } + + cases := []*testCase{ + {tag: "empty_buffers", buffers: nil, expect: []int64{}}, + {tag: "3_candidates", buffers: []*segmentBuffer{ + { + segmentID: 100, + insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 1}}}, + deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}}, + }, + { + segmentID: 200, + insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 2}}}, + deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}}, + }, + { + segmentID: 300, + insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 3}}}, + deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}}, + }, + }, expect: []int64{100, 200}}, + {tag: "1_candidates", buffers: []*segmentBuffer{ + { + segmentID: 100, + insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 1}}}, + deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}}, + }, + }, expect: []int64{100}}, + } + + for _, tc := range cases { + s.Run(tc.tag, func() { + s.ElementsMatch(tc.expect, policy.SelectSegments(tc.buffers, 0)) + }) + } +} + func TestSyncPolicy(t *testing.T) { suite.Run(t, new(SyncPolicySuite)) } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index b88b767a2e..fadf2007e2 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -44,12 +44,16 @@ type WriteBuffer interface { SetFlushTimestamp(flushTs uint64) // GetFlushTimestamp get current flush timestamp GetFlushTimestamp() uint64 - // FlushSegments is the method to perform `Sync` operation with provided options. - FlushSegments(ctx context.Context, segmentIDs []int64) error + // SealSegments is the method to perform `Sync` operation with provided options. + SealSegments(ctx context.Context, segmentIDs []int64) error // GetCheckpoint returns current channel checkpoint. // If there are any non-empty segment buffer, returns the earliest buffer start position. // Otherwise, returns latest buffered checkpoint. GetCheckpoint() *msgpb.MsgPosition + // MemorySize returns the size in bytes currently used by this write buffer. + MemorySize() int64 + // EvictBuffer evicts buffer to sync manager which match provided sync policies. + EvictBuffer(policies ...SyncPolicy) // Close is the method to close and sink current buffer data. Close(drop bool) } @@ -147,7 +151,7 @@ func (wb *writeBufferBase) HasSegment(segmentID int64) bool { return ok } -func (wb *writeBufferBase) FlushSegments(ctx context.Context, segmentIDs []int64) error { +func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) error { wb.mut.RLock() defer wb.mut.RUnlock() @@ -162,6 +166,40 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 { return wb.flushTimestamp.Load() } +func (wb *writeBufferBase) MemorySize() int64 { + wb.mut.RLock() + defer wb.mut.RUnlock() + + var size int64 + for _, segBuf := range wb.buffers { + size += segBuf.MemorySize() + } + return size +} + +func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) { + wb.mut.Lock() + defer wb.mut.Unlock() + + log := log.Ctx(context.Background()).With( + zap.Int64("collectionID", wb.collectionID), + zap.String("channel", wb.channelName), + ) + // need valid checkpoint before triggering syncing + if wb.checkpoint == nil { + log.Warn("evict buffer before buffering data") + return + } + + ts := wb.checkpoint.GetTimestamp() + + segmentIDs := wb.getSegmentsToSync(ts, policies...) + if len(segmentIDs) > 0 { + log.Info("evict buffer find segments to sync", zap.Int64s("segmentIDs", segmentIDs)) + wb.syncSegments(context.Background(), segmentIDs) + } +} + func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { log := log.Ctx(context.Background()). With(zap.String("channel", wb.channelName)). @@ -225,7 +263,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { } func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) { - segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp()) + segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp(), wb.syncPolicies...) if len(segmentsToSync) > 0 { log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync)) wb.syncSegments(context.Background(), segmentsToSync) @@ -282,10 +320,10 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) // getSegmentsToSync applies all policies to get segments list to sync. // **NOTE** shall be invoked within mutex protection -func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 { +func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp, policies ...SyncPolicy) []int64 { buffers := lo.Values(wb.buffers) segments := typeutil.NewSet[int64]() - for _, policy := range wb.syncPolicies { + for _, policy := range policies { result := policy.SelectSegments(buffers, ts) if len(result) > 0 { log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason())) diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 3367db2d53..2659ea4cfd 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -115,7 +115,7 @@ func (s *WriteBufferSuite) TestFlushSegments() { wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) - err = wb.FlushSegments(context.Background(), []int64{segmentID}) + err = wb.SealSegments(context.Background(), []int64{segmentID}) s.NoError(err) } @@ -299,6 +299,73 @@ func (s *WriteBufferSuite) TestSyncSegmentsError() { }) } +func (s *WriteBufferSuite) TestEvictBuffer() { + wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }, + }) + s.Require().NoError(err) + + serializer := syncmgr.NewMockSerializer(s.T()) + + wb.serializer = serializer + + s.Run("no_checkpoint", func() { + wb.mut.Lock() + wb.buffers[100] = &segmentBuffer{} + wb.mut.Unlock() + defer func() { + wb.mut.Lock() + defer wb.mut.Unlock() + wb.buffers = make(map[int64]*segmentBuffer) + }() + + wb.EvictBuffer(GetOldestBufferPolicy(1)) + + serializer.AssertNotCalled(s.T(), "EncodeBuffer") + }) + + s.Run("trigger_sync", func() { + buf1, err := newSegmentBuffer(2, s.collSchema) + s.Require().NoError(err) + buf1.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 440, + } + buf1.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 400, + } + buf2, err := newSegmentBuffer(3, s.collSchema) + s.Require().NoError(err) + buf2.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 550, + } + buf2.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 600, + } + + wb.mut.Lock() + wb.buffers[2] = buf1 + wb.buffers[3] = buf2 + wb.checkpoint = &msgpb.MsgPosition{Timestamp: 100} + wb.mut.Unlock() + + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: 2, + }, nil) + s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true) + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil) + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + defer func() { + s.wb.mut.Lock() + defer s.wb.mut.Unlock() + s.wb.buffers = make(map[int64]*segmentBuffer) + }() + wb.EvictBuffer(GetOldestBufferPolicy(1)) + }) +} + func TestWriteBufferBase(t *testing.T) { suite.Run(t, new(WriteBufferSuite)) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2279452a81..66e390bf34 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2827,6 +2827,7 @@ type dataNodeConfig struct { // memory management MemoryForceSyncEnable ParamItem `refreshable:"true"` MemoryForceSyncSegmentNum ParamItem `refreshable:"true"` + MemoryCheckInterval ParamItem `refreshable:"true"` MemoryWatermark ParamItem `refreshable:"true"` DataNodeTimeTickByRPC ParamItem `refreshable:"false"` @@ -2939,6 +2940,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.MemoryForceSyncSegmentNum.Init(base.mgr) + p.MemoryCheckInterval = ParamItem{ + Key: "datanode.memory.checkInterval", + Version: "2.4.0", + DefaultValue: "3000", // milliseconds + Doc: "the interal to check datanode memory usage, in milliseconds", + Export: true, + } + p.MemoryCheckInterval.Init(base.mgr) + if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode { p.MemoryWatermark = ParamItem{ Key: "datanode.memory.watermarkStandalone",