From 18dc6b61cee298f611b57b9505787f94841d6809 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 17 Nov 2023 21:46:20 +0800 Subject: [PATCH] enhance: fix LevelZero segment sync logic (#28482) See also #27675 - Fix LevelZero segment cannot be flushed - Add level option for syncTask - Invoke `AddSegment` when new LevelZero segment is allocated Signed-off-by: Congqi Xia --- internal/datanode/data_sync_service.go | 2 +- internal/datanode/metacache/actions.go | 13 +++++ internal/datanode/metacache/meta_cache.go | 22 -------- .../datanode/metacache/meta_cache_test.go | 15 ----- .../datanode/metacache/mock_meta_cache.go | 52 ----------------- internal/datanode/metacache/segment.go | 12 ++++ internal/datanode/services_test.go | 12 +++- internal/datanode/syncmgr/meta_writer.go | 9 ++- internal/datanode/syncmgr/meta_writer_test.go | 1 + internal/datanode/syncmgr/options.go | 5 ++ .../datanode/syncmgr/sync_manager_test.go | 5 +- internal/datanode/syncmgr/task.go | 1 + .../datanode/writebuffer/bf_write_buffer.go | 8 +-- .../writebuffer/bf_write_buffer_test.go | 56 ++++++++++--------- .../datanode/writebuffer/l0_write_buffer.go | 35 +++++++++--- .../writebuffer/l0_write_buffer_test.go | 8 ++- internal/datanode/writebuffer/manager.go | 7 +-- internal/datanode/writebuffer/manager_test.go | 13 ++++- .../datanode/writebuffer/mock_mananger.go | 29 +++++----- internal/datanode/writebuffer/sync_policy.go | 5 +- internal/datanode/writebuffer/write_buffer.go | 27 ++++----- .../datanode/writebuffer/write_buffer_test.go | 14 +++-- 22 files changed, 175 insertions(+), 176 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 90dcecf445..e14933fedd 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -295,7 +295,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb resendTTCh = make(chan resendTTMsg, 100) ) - node.writeBufferManager.Register(channelName, metacache.Schema(), metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker)), writebuffer.WithIDAllocator(node.allocator)) + node.writeBufferManager.Register(channelName, metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker)), writebuffer.WithIDAllocator(node.allocator)) ctx, cancel := context.WithCancel(node.ctx) ds := &dataSyncService{ ctx: ctx, diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index eb3a5baeea..5efc8d2203 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -19,6 +19,7 @@ package metacache import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,6 +58,12 @@ func WithImporting() SegmentFilter { } } +func WithLevel(level datapb.SegmentLevel) SegmentFilter { + return func(info *SegmentInfo) bool { + return info.level == level + } +} + type SegmentAction func(info *SegmentInfo) func UpdateState(state commonpb.SegmentState) SegmentAction { @@ -115,6 +122,12 @@ func FinishSyncing(batchSize int64) SegmentAction { } } +func SetStartPosRecorded(flag bool) SegmentAction { + return func(info *SegmentInfo) { + info.startPosRecorded = flag + } +} + // MergeSegmentAction is the util function to merge multiple SegmentActions into one. func MergeSegmentAction(actions ...SegmentAction) SegmentAction { return func(info *SegmentInfo) { diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index cdebfaba29..7963cab22f 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -35,8 +34,6 @@ type MetaCache interface { Collection() int64 // Schema returns collection schema. Schema() *schemapb.CollectionSchema - // NewSegment creates a new segment from WAL stream data. - NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) // AddSegment adds a segment from segment info. AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsFactory, actions ...SegmentAction) // UpdateSegments applies action to segment(s) satisfy the provided filters. @@ -97,25 +94,6 @@ func (c *metaCacheImpl) Schema() *schemapb.CollectionSchema { return c.schema } -// NewSegment creates a new segment from WAL stream data. -func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) { - c.mu.Lock() - defer c.mu.Unlock() - - if _, ok := c.segmentInfos[segmentID]; !ok { - info := &SegmentInfo{ - segmentID: segmentID, - partitionID: partitionID, - state: commonpb.SegmentState_Growing, - startPosRecorded: false, - } - for _, action := range actions { - action(info) - } - c.segmentInfos[segmentID] = info - } -} - // AddSegment adds a segment from segment info. func (c *metaCacheImpl) AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsFactory, actions ...SegmentAction) { segment := NewSegmentInfo(segInfo, factory(segInfo)) diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index 918ece4530..627d721cb3 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -100,21 +100,6 @@ func (s *MetaCacheSuite) TestMetaInfo() { s.Equal(s.collSchema, s.cache.Schema()) } -func (s *MetaCacheSuite) TestNewSegment() { - for i, seg := range s.newSegments { - s.cache.NewSegment(seg, s.partitionIDs[i], nil, UpdateNumOfRows(100)) - } - - for id, partitionID := range s.partitionIDs { - segs := s.cache.GetSegmentIDsBy(WithPartitionID(partitionID)) - targets := []int64{s.flushedSegments[id], s.growingSegments[id], s.newSegments[id]} - s.Equal(len(targets), len(segs)) - for _, seg := range segs { - s.True(lo.Contains(targets, seg)) - } - } -} - func (s *MetaCacheSuite) TestCompactSegments() { for i, seg := range s.newSegments { // compaction from flushed[i], unflushed[i] and invalidSeg to new[i] diff --git a/internal/datanode/metacache/mock_meta_cache.go b/internal/datanode/metacache/mock_meta_cache.go index 07f26dbc6b..076ec05f18 100644 --- a/internal/datanode/metacache/mock_meta_cache.go +++ b/internal/datanode/metacache/mock_meta_cache.go @@ -6,8 +6,6 @@ import ( datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" - msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" storage "github.com/milvus-io/milvus/internal/storage" @@ -350,56 +348,6 @@ func (_c *MockMetaCache_GetSegmentsBy_Call) RunAndReturn(run func(...SegmentFilt return _c } -// NewSegment provides a mock function with given fields: segmentID, partitionID, startPos, actions -func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) { - _va := make([]interface{}, len(actions)) - for _i := range actions { - _va[_i] = actions[_i] - } - var _ca []interface{} - _ca = append(_ca, segmentID, partitionID, startPos) - _ca = append(_ca, _va...) - _m.Called(_ca...) -} - -// MockMetaCache_NewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewSegment' -type MockMetaCache_NewSegment_Call struct { - *mock.Call -} - -// NewSegment is a helper method to define mock.On call -// - segmentID int64 -// - partitionID int64 -// - startPos *msgpb.MsgPosition -// - actions ...SegmentAction -func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}, startPos interface{}, actions ...interface{}) *MockMetaCache_NewSegment_Call { - return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment", - append([]interface{}{segmentID, partitionID, startPos}, actions...)...)} -} - -func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)) *MockMetaCache_NewSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]SegmentAction, len(args)-3) - for i, a := range args[3:] { - if a != nil { - variadicArgs[i] = a.(SegmentAction) - } - } - run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition), variadicArgs...) - }) - return _c -} - -func (_c *MockMetaCache_NewSegment_Call) Return() *MockMetaCache_NewSegment_Call { - _c.Call.Return() - return _c -} - -func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition, ...SegmentAction)) *MockMetaCache_NewSegment_Call { - _c.Call.Return(run) - return _c -} - // PredictSegments provides a mock function with given fields: pk, filters func (_m *MockMetaCache) PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) { _va := make([]interface{}, len(filters)) diff --git a/internal/datanode/metacache/segment.go b/internal/datanode/metacache/segment.go index fa99a7c5dd..8b5bdd447a 100644 --- a/internal/datanode/metacache/segment.go +++ b/internal/datanode/metacache/segment.go @@ -36,6 +36,7 @@ type SegmentInfo struct { bfs *BloomFilterSet compactTo int64 importing bool + level datapb.SegmentLevel } func (s *SegmentInfo) SegmentID() int64 { @@ -81,6 +82,10 @@ func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet { return s.bfs } +func (s *SegmentInfo) Level() datapb.SegmentLevel { + return s.level +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, @@ -94,10 +99,16 @@ func (s *SegmentInfo) Clone() *SegmentInfo { syncingRows: s.syncingRows, bfs: s.bfs, compactTo: s.compactTo, + level: s.level, + importing: s.importing, } } func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo { + level := info.GetLevel() + if level == datapb.SegmentLevel_Legacy { + level = datapb.SegmentLevel_L1 + } return &SegmentInfo{ segmentID: info.GetID(), partitionID: info.GetPartitionID(), @@ -106,6 +117,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo startPosition: info.GetStartPosition(), checkpoint: info.GetDmlPosition(), startPosRecorded: true, + level: level, bfs: bfs, } } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index de7be344c2..9729f6fd80 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -240,9 +240,17 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) s.Require().True(ok) - s.node.writeBufferManager.Register(dmChannelName, schema, metacache.NewMockMetaCache(s.T())) + metaCache := metacache.NewMockMetaCache(s.T()) + metaCache.EXPECT().Collection().Return(1).Maybe() + metaCache.EXPECT().Schema().Return(schema).Maybe() + s.node.writeBufferManager.Register(dmChannelName, metaCache) - fgservice.metacache.NewSegment(segmentID, 1, &msgpb.MsgPosition{}) + fgservice.metacache.AddSegment(&datapb.SegmentInfo{ + ID: segmentID, + CollectionID: 1, + PartitionID: 2, + StartPosition: &msgpb.MsgPosition{}, + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) s.Run("service_not_ready", func() { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/datanode/syncmgr/meta_writer.go b/internal/datanode/syncmgr/meta_writer.go index 1d75372e40..716a935f3a 100644 --- a/internal/datanode/syncmgr/meta_writer.go +++ b/internal/datanode/syncmgr/meta_writer.go @@ -69,6 +69,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { log.Info("SaveBinlogPath", zap.Int64("SegmentID", pack.segmentID), zap.Int64("CollectionID", pack.collectionID), + zap.Int64("ParitionID", pack.partitionID), zap.Any("startPos", startPos), zap.Any("checkPoints", checkPoints), zap.Int("binlogNum", lo.SumBy(insertFieldBinlogs, getBinlogNum)), @@ -85,6 +86,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { ), SegmentID: pack.segmentID, CollectionID: pack.collectionID, + PartitionID: pack.partitionID, Field2BinlogPaths: insertFieldBinlogs, Field2StatslogPaths: statsFieldBinlogs, Deltalogs: deltaFieldBinlogs, @@ -95,6 +97,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { Flushed: pack.isFlush, Dropped: pack.isDrop, Channel: pack.channelName, + SegLevel: pack.level, } err := retry.Do(context.Background(), func() error { err := b.broker.SaveBinlogPaths(context.Background(), req) @@ -124,8 +127,12 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { log.Warn("failed to SaveBinlogPaths", zap.Int64("segmentID", pack.segmentID), zap.Error(err)) + return err } - return err + + pack.metacache.UpdateSegments(metacache.SetStartPosRecorded(true), metacache.WithSegmentIDs(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) int64 { return pos.GetSegmentID() })...)) + + return nil } func (b *brokerMetaWriter) DropChannel(channelName string) error { diff --git a/internal/datanode/syncmgr/meta_writer_test.go b/internal/datanode/syncmgr/meta_writer_test.go index 2f9cc0ebd4..763c4f2283 100644 --- a/internal/datanode/syncmgr/meta_writer_test.go +++ b/internal/datanode/syncmgr/meta_writer_test.go @@ -40,6 +40,7 @@ func (s *MetaWriterSuite) TestNormalSave() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() task := NewSyncTask() task.WithMetaCache(s.metacache) err := s.writer.UpdateSync(task) diff --git a/internal/datanode/syncmgr/options.go b/internal/datanode/syncmgr/options.go index 073764b1c0..324cdb1700 100644 --- a/internal/datanode/syncmgr/options.go +++ b/internal/datanode/syncmgr/options.go @@ -115,3 +115,8 @@ func (t *SyncTask) WithBatchSize(batchSize int64) *SyncTask { t.batchSize = batchSize return t } + +func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask { + t.level = level + return t +} diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index 95d4626c6e..072cac7901 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -210,6 +210,7 @@ func (s *SyncManagerSuite) TestCompacted() { func (s *SyncManagerSuite) TestBlock() { sig := make(chan struct{}) + counter := atomic.NewInt32(0) s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil) bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) @@ -219,7 +220,9 @@ func (s *SyncManagerSuite) TestBlock() { return []*metacache.SegmentInfo{seg} }) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(_ metacache.SegmentAction, filters ...metacache.SegmentFilter) { - close(sig) + if counter.Inc() == 2 { + close(sig) + } }) manager, err := NewSyncManager(10, s.chunkManager, s.allocator) diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index ee5fe5c224..21617752df 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -41,6 +41,7 @@ type SyncTask struct { // batchSize is the row number of this sync task, // not the total num of rows of segemnt batchSize int64 + level datapb.SegmentLevel tsFrom typeutil.Timestamp tsTo typeutil.Timestamp diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index d2b0eb68b0..a44bb86eb2 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -3,7 +3,6 @@ package writebuffer import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/storage" @@ -18,9 +17,9 @@ type bfWriteBuffer struct { metacache metacache.MetaCache } -func NewBFWriteBuffer(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { +func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { return &bfWriteBuffer{ - writeBufferBase: newWriteBufferBase(channel, sch, metacache, syncMgr, option), + writeBufferBase: newWriteBufferBase(channel, metacache, syncMgr, option), syncMgr: syncMgr, }, nil } @@ -71,5 +70,6 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg // update buffer last checkpoint wb.checkpoint = endPos - return wb.triggerSync() + _ = wb.triggerSync() + return nil } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index f8e896a34d..935466e831 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -25,6 +25,7 @@ import ( type BFWriteBufferSuite struct { suite.Suite + collID int64 channelName string collSchema *schemapb.CollectionSchema syncMgr *syncmgr.MockSyncManager @@ -34,6 +35,7 @@ type BFWriteBufferSuite struct { func (s *BFWriteBufferSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) + s.collID = 100 s.collSchema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ @@ -136,17 +138,19 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre func (s *BFWriteBufferSuite) SetupTest() { s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) + s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() + s.metacache.EXPECT().Collection().Return(s.collID).Maybe() s.broker = broker.NewMockBroker(s.T()) } func (s *BFWriteBufferSuite) TestBufferData() { - wb, err := NewBFWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{}) + wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{}) s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() pks, msg := s.composeInsertMsg(1000, 10, 128) @@ -159,30 +163,32 @@ func (s *BFWriteBufferSuite) TestBufferData() { func (s *BFWriteBufferSuite) TestAutoSync() { paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") - wb, err := NewBFWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ - syncPolicies: []SyncPolicy{ - SyncFullBuffer, - GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), - GetFlushingSegmentsPolicy(s.metacache), - }, + s.Run("normal_auto_sync", func() { + wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ + syncPolicies: []SyncPolicy{ + SyncFullBuffer, + GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), + GetFlushingSegmentsPolicy(s.metacache), + }, + }) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true) + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + + pks, msg := s.composeInsertMsg(1000, 10, 128) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) }) - s.NoError(err) - - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) - - pks, msg := s.composeInsertMsg(1000, 10, 128) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - - err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) } func TestBFWriteBuffer(t *testing.T) { diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index d036dece17..14aeba27b6 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -5,11 +5,12 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -20,19 +21,21 @@ import ( type l0WriteBuffer struct { *writeBufferBase - l0Segments map[int64]int64 // partitionID => l0 segment ID + l0Segments map[int64]int64 // partitionID => l0 segment ID + l0partition map[int64]int64 // l0 segment id => partition id syncMgr syncmgr.SyncManager idAllocator allocator.Interface } -func NewL0WriteBuffer(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { +func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { if option.idAllocator == nil { return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer") } return &l0WriteBuffer{ l0Segments: make(map[int64]int64), - writeBufferBase: newWriteBufferBase(channel, sch, metacache, syncMgr, option), + l0partition: make(map[int64]int64), + writeBufferBase: newWriteBufferBase(channel, metacache, syncMgr, option), syncMgr: syncMgr, idAllocator: option.idAllocator, }, nil @@ -50,7 +53,7 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg } for _, msg := range deleteMsgs { - l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID()) + l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos) pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos) if err != nil { @@ -62,10 +65,18 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg // update buffer last checkpoint wb.checkpoint = endPos - return wb.triggerSync() + segmentsSync := wb.triggerSync() + for _, segment := range segmentsSync { + partition, ok := wb.l0partition[segment] + if ok { + delete(wb.l0partition, segment) + delete(wb.l0Segments, partition) + } + } + return nil } -func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 { +func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPosition) int64 { segmentID, ok := wb.l0Segments[partitionID] if !ok { err := retry.Do(context.Background(), func() error { @@ -78,6 +89,16 @@ func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 { panic(err) } wb.l0Segments[partitionID] = segmentID + wb.l0partition[segmentID] = partitionID + wb.metaCache.AddSegment(&datapb.SegmentInfo{ + ID: segmentID, + PartitionID: partitionID, + CollectionID: wb.collectionID, + InsertChannel: wb.channelName, + StartPosition: startPos, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L0, + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false)) } return segmentID } diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index 2f74396a50..3007e0325b 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -25,6 +25,7 @@ import ( type L0WriteBufferSuite struct { suite.Suite channelName string + collID int64 collSchema *schemapb.CollectionSchema syncMgr *syncmgr.MockSyncManager metacache *metacache.MockMetaCache @@ -33,6 +34,7 @@ type L0WriteBufferSuite struct { func (s *L0WriteBufferSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) + s.collID = 100 s.collSchema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ @@ -136,12 +138,14 @@ func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre func (s *L0WriteBufferSuite) SetupTest() { s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) + s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() + s.metacache.EXPECT().Collection().Return(s.collID).Maybe() s.allocator = allocator.NewMockGIDAllocator() s.allocator.AllocOneF = func() (int64, error) { return int64(tsoutil.ComposeTSByTime(time.Now(), 0)), nil } } func (s *L0WriteBufferSuite) TestBufferData() { - wb, err := NewL0WriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ idAllocator: s.allocator, }) s.NoError(err) @@ -150,7 +154,7 @@ func (s *L0WriteBufferSuite) TestBufferData() { delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index be7e095b4e..b7035ec4cc 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -7,7 +7,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/pkg/log" @@ -18,7 +17,7 @@ import ( // BufferManager is the interface for WriteBuffer management. type BufferManager interface { // Register adds a WriteBuffer with provided schema & options. - Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error + Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error // FlushSegments notifies writeBuffer corresponding to provided channel to flush segments. FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error // FlushChannel @@ -50,7 +49,7 @@ type bufferManager struct { } // Register a new WriteBuffer for channel. -func (m *bufferManager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error { +func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error { m.mut.Lock() defer m.mut.Unlock() @@ -58,7 +57,7 @@ func (m *bufferManager) Register(channel string, schema *schemapb.CollectionSche if ok { return merr.WrapErrChannelReduplicate(channel) } - buf, err := NewWriteBuffer(channel, schema, metacache, m.syncMgr, opts...) + buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...) if err != nil { return err } diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 7a437012fb..48d01563c1 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/pkg/common" @@ -21,16 +22,19 @@ import ( type ManagerSuite struct { suite.Suite + collID int64 channelName string collSchema *schemapb.CollectionSchema syncMgr *syncmgr.MockSyncManager metacache *metacache.MockMetaCache + allocator *allocator.MockAllocator manager *bufferManager } func (s *ManagerSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) + s.collID = 100 s.collSchema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ @@ -52,6 +56,9 @@ func (s *ManagerSuite) SetupSuite() { func (s *ManagerSuite) SetupTest() { s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) + s.metacache.EXPECT().Collection().Return(s.collID).Maybe() + s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() + s.allocator = allocator.NewMockAllocator(s.T()) mgr := NewManager(s.syncMgr) var ok bool @@ -62,10 +69,10 @@ func (s *ManagerSuite) SetupTest() { func (s *ManagerSuite) TestRegister() { manager := s.manager - err := manager.Register(s.channelName, s.collSchema, s.metacache) + err := manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator)) s.NoError(err) - err = manager.Register(s.channelName, s.collSchema, s.metacache) + err = manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator)) s.Error(err) s.ErrorIs(err, merr.ErrChannelReduplicate) } @@ -169,7 +176,7 @@ func (s *ManagerSuite) TestRemoveChannel() { }) s.Run("remove_channel", func() { - err := manager.Register(s.channelName, s.collSchema, s.metacache) + err := manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator)) s.Require().NoError(err) s.NotPanics(func() { diff --git a/internal/datanode/writebuffer/mock_mananger.go b/internal/datanode/writebuffer/mock_mananger.go index a42efcd32a..cdd6e22f5f 100644 --- a/internal/datanode/writebuffer/mock_mananger.go +++ b/internal/datanode/writebuffer/mock_mananger.go @@ -11,8 +11,6 @@ import ( msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream" - - schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) // MockBufferManager is an autogenerated mock type for the BufferManager type @@ -290,20 +288,20 @@ func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) RunAndReturn(run func( return _c } -// Register provides a mock function with given fields: channel, schema, _a2, opts -func (_m *MockBufferManager) Register(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption) error { +// Register provides a mock function with given fields: channel, _a1, opts +func (_m *MockBufferManager) Register(channel string, _a1 metacache.MetaCache, opts ...WriteBufferOption) error { _va := make([]interface{}, len(opts)) for _i := range opts { _va[_i] = opts[_i] } var _ca []interface{} - _ca = append(_ca, channel, schema, _a2) + _ca = append(_ca, channel, _a1) _ca = append(_ca, _va...) ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error); ok { - r0 = rf(channel, schema, _a2, opts...) + if rf, ok := ret.Get(0).(func(string, metacache.MetaCache, ...WriteBufferOption) error); ok { + r0 = rf(channel, _a1, opts...) } else { r0 = ret.Error(0) } @@ -318,23 +316,22 @@ type MockBufferManager_Register_Call struct { // Register is a helper method to define mock.On call // - channel string -// - schema *schemapb.CollectionSchema -// - _a2 metacache.MetaCache +// - _a1 metacache.MetaCache // - opts ...WriteBufferOption -func (_e *MockBufferManager_Expecter) Register(channel interface{}, schema interface{}, _a2 interface{}, opts ...interface{}) *MockBufferManager_Register_Call { +func (_e *MockBufferManager_Expecter) Register(channel interface{}, _a1 interface{}, opts ...interface{}) *MockBufferManager_Register_Call { return &MockBufferManager_Register_Call{Call: _e.mock.On("Register", - append([]interface{}{channel, schema, _a2}, opts...)...)} + append([]interface{}{channel, _a1}, opts...)...)} } -func (_c *MockBufferManager_Register_Call) Run(run func(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption)) *MockBufferManager_Register_Call { +func (_c *MockBufferManager_Register_Call) Run(run func(channel string, _a1 metacache.MetaCache, opts ...WriteBufferOption)) *MockBufferManager_Register_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]WriteBufferOption, len(args)-3) - for i, a := range args[3:] { + variadicArgs := make([]WriteBufferOption, len(args)-2) + for i, a := range args[2:] { if a != nil { variadicArgs[i] = a.(WriteBufferOption) } } - run(args[0].(string), args[1].(*schemapb.CollectionSchema), args[2].(metacache.MetaCache), variadicArgs...) + run(args[0].(string), args[1].(metacache.MetaCache), variadicArgs...) }) return _c } @@ -344,7 +341,7 @@ func (_c *MockBufferManager_Register_Call) Return(_a0 error) *MockBufferManager_ return _c } -func (_c *MockBufferManager_Register_Call) RunAndReturn(run func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error) *MockBufferManager_Register_Call { +func (_c *MockBufferManager_Register_Call) RunAndReturn(run func(string, metacache.MetaCache, ...WriteBufferOption) error) *MockBufferManager_Register_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/writebuffer/sync_policy.go b/internal/datanode/writebuffer/sync_policy.go index 604f001dd5..e243dc84cf 100644 --- a/internal/datanode/writebuffer/sync_policy.go +++ b/internal/datanode/writebuffer/sync_policy.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -48,7 +49,9 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S if !ok { return buf.segmentID, false } - return buf.segmentID, seg.State() == commonpb.SegmentState_Flushed && buf.MinTimestamp() < flushTs + inRange := seg.State() == commonpb.SegmentState_Flushed || + seg.Level() == datapb.SegmentLevel_L0 + return buf.segmentID, inRange && buf.MinTimestamp() < flushTs }) // set segment flushing meta.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing), diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index d8e1b752f4..82d33c4616 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -48,7 +48,7 @@ type WriteBuffer interface { Close(drop bool) } -func NewWriteBuffer(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) { +func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) { option := defaultWBOption() option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache)) for _, opt := range opts { @@ -57,9 +57,9 @@ func NewWriteBuffer(channel string, schema *schemapb.CollectionSchema, metacache switch option.deletePolicy { case DeletePolicyBFPkOracle: - return NewBFWriteBuffer(channel, schema, metacache, syncMgr, option) + return NewBFWriteBuffer(channel, metacache, syncMgr, option) case DeletePolicyL0Delta: - return NewL0WriteBuffer(channel, schema, metacache, syncMgr, option) + return NewL0WriteBuffer(channel, metacache, syncMgr, option) default: return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy) } @@ -84,14 +84,15 @@ type writeBufferBase struct { flushTimestamp *atomic.Uint64 } -func newWriteBufferBase(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase { +func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase { flushTs := atomic.NewUint64(nonFlushTS) flushTsPolicy := GetFlushTsPolicy(flushTs, metacache) option.syncPolicies = append(option.syncPolicies, flushTsPolicy) return &writeBufferBase{ channelName: channel, - collSchema: sch, + collectionID: metacache.Collection(), + collSchema: metacache.Schema(), syncMgr: syncMgr, metaWriter: option.metaWriter, buffers: make(map[int64]*segmentBuffer), @@ -143,18 +144,14 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { return checkpoint } -func (wb *writeBufferBase) triggerSync() error { +func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) { segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp()) if len(segmentsToSync) > 0 { log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync)) - err := wb.syncSegments(context.Background(), segmentsToSync) - if err != nil { - log.Warn("segment segments failed", zap.Int64s("segmentIDs", segmentsToSync), zap.Error(err)) - return err - } + wb.syncSegments(context.Background(), segmentsToSync) } - return nil + return segmentsToSync } func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error { @@ -169,7 +166,7 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64 return nil } -func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) error { +func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) { for _, segmentID := range segmentIDs { syncTask := wb.getSyncTask(ctx, segmentID) if syncTask == nil { @@ -181,7 +178,6 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) // discard Future here, handle error in callback _ = wb.syncMgr.SyncData(ctx, syncTask) } - return nil } // getSegmentsToSync applies all policies to get segments list to sync. @@ -242,7 +238,7 @@ func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, start InsertChannel: wb.channelName, StartPosition: startPos, State: commonpb.SegmentState_Growing, - }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false)) } segBuf := wb.getOrCreateBuffer(segmentID) @@ -292,6 +288,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) *sy WithChannelName(wb.channelName). WithSegmentID(segmentID). WithStartPosition(startPos). + WithLevel(segmentInfo.Level()). WithCheckpoint(wb.checkpoint). WithSchema(wb.collSchema). WithBatchSize(batchSize). diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 9061c97dad..ec54cc588f 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -19,6 +19,7 @@ import ( type WriteBufferSuite struct { suite.Suite + collID int64 channelName string collSchema *schemapb.CollectionSchema wb *writeBufferBase @@ -28,6 +29,7 @@ type WriteBufferSuite struct { func (s *WriteBufferSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) + s.collID = 100 s.collSchema = &schemapb.CollectionSchema{ Name: "wb_base_collection", Fields: []*schemapb.FieldSchema{ @@ -43,7 +45,9 @@ func (s *WriteBufferSuite) SetupSuite() { func (s *WriteBufferSuite) SetupTest() { s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) - s.wb = newWriteBufferBase(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ + s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() + s.metacache.EXPECT().Collection().Return(s.collID).Maybe() + s.wb = newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, @@ -51,18 +55,18 @@ func (s *WriteBufferSuite) SetupTest() { } func (s *WriteBufferSuite) TestWriteBufferType() { - wb, err := NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) _, ok := wb.(*bfWriteBuffer) s.True(ok) - wb, err = NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) + wb, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) s.NoError(err) _, ok = wb.(*l0WriteBuffer) s.True(ok) - _, err = NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy("")) + _, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy("")) s.Error(err) } @@ -81,7 +85,7 @@ func (s *WriteBufferSuite) TestFlushSegments() { s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything) - wb, err := NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) err = wb.FlushSegments(context.Background(), []int64{segmentID})