diff --git a/Makefile b/Makefile index 52d984e2be..ef2bc4eb0f 100644 --- a/Makefile +++ b/Makefile @@ -446,6 +446,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage $(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage + $(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 4f7748284d..590d554011 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -196,7 +196,7 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C } func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.SegmentInfo, schema *schemapb.CollectionSchema) ([]*storage.PkStatistics, error) { - space, err := storageCache.GetOrCreateSpace(segment.ID, writebuffer.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) + space, err := storageCache.GetOrCreateSpace(segment.ID, syncmgr.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) if err != nil { return nil, err } diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 81bc141abd..214f9ed433 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -102,9 +103,9 @@ func UpdateBufferedRows(bufferedRows int64) SegmentAction { } } -func RollStats() SegmentAction { +func RollStats(newStats ...*storage.PrimaryKeyStats) SegmentAction { return func(info *SegmentInfo) { - info.bfs.Roll() + info.bfs.Roll(newStats...) } } diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index f074a8ece7..7e9bbcb7b0 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" ) @@ -64,13 +65,18 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { return bfs.current.UpdatePKRange(ids) } -func (bfs *BloomFilterSet) Roll() { +func (bfs *BloomFilterSet) Roll(newStats ...*storage.PrimaryKeyStats) { bfs.mut.Lock() defer bfs.mut.Unlock() - if bfs.current != nil { - bfs.history = append(bfs.history, bfs.current) - bfs.current = nil + if len(newStats) > 0 { + bfs.history = append(bfs.history, lo.Map(newStats, func(stats *storage.PrimaryKeyStats, _ int) *storage.PkStatistics { + return &storage.PkStatistics{ + PkFilter: stats.BF, + MaxPK: stats.MaxPk, + MinPK: stats.MinPk, + } + })...) } } diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index d4dc6668b1..4d647b6910 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -78,7 +78,9 @@ func (s *BloomFilterSetSuite) TestRoll() { err := s.bfs.UpdatePKRange(s.GetFieldData(ids)) s.NoError(err) - s.bfs.Roll() + newEntry := &storage.PrimaryKeyStats{} + + s.bfs.Roll(newEntry) history = s.bfs.GetHistory() s.Equal(1, len(history), "history shall have one entry after roll with current data") diff --git a/internal/datanode/mock_fgmanager.go b/internal/datanode/mock_fgmanager.go index 1dea01e67f..73ed8f50a3 100644 --- a/internal/datanode/mock_fgmanager.go +++ b/internal/datanode/mock_fgmanager.go @@ -388,103 +388,6 @@ func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) RunAndReturn(run func(strin return _c } -// Start provides a mock function with given fields: -func (_m *MockFlowgraphManager) Start() { - _m.Called() -} - -// MockFlowgraphManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' -type MockFlowgraphManager_Start_Call struct { - *mock.Call -} - -// Start is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) Start() *MockFlowgraphManager_Start_Call { - return &MockFlowgraphManager_Start_Call{Call: _e.mock.On("Start")} -} - -func (_c *MockFlowgraphManager_Start_Call) Run(run func()) *MockFlowgraphManager_Start_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockFlowgraphManager_Start_Call) Return() *MockFlowgraphManager_Start_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_Start_Call) RunAndReturn(run func()) *MockFlowgraphManager_Start_Call { - _c.Call.Return(run) - return _c -} - -// Stop provides a mock function with given fields: -func (_m *MockFlowgraphManager) Stop() { - _m.Called() -} - -// MockFlowgraphManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' -type MockFlowgraphManager_Stop_Call struct { - *mock.Call -} - -// Stop is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) Stop() *MockFlowgraphManager_Stop_Call { - return &MockFlowgraphManager_Stop_Call{Call: _e.mock.On("Stop")} -} - -func (_c *MockFlowgraphManager_Stop_Call) Run(run func()) *MockFlowgraphManager_Stop_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockFlowgraphManager_Stop_Call) Return() *MockFlowgraphManager_Stop_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_Stop_Call) RunAndReturn(run func()) *MockFlowgraphManager_Stop_Call { - _c.Call.Return(run) - return _c -} - -// controlMemWaterLevel provides a mock function with given fields: totalMemory -func (_m *MockFlowgraphManager) controlMemWaterLevel(totalMemory uint64) { - _m.Called(totalMemory) -} - -// MockFlowgraphManager_controlMemWaterLevel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'controlMemWaterLevel' -type MockFlowgraphManager_controlMemWaterLevel_Call struct { - *mock.Call -} - -// controlMemWaterLevel is a helper method to define mock.On call -// - totalMemory uint64 -func (_e *MockFlowgraphManager_Expecter) controlMemWaterLevel(totalMemory interface{}) *MockFlowgraphManager_controlMemWaterLevel_Call { - return &MockFlowgraphManager_controlMemWaterLevel_Call{Call: _e.mock.On("controlMemWaterLevel", totalMemory)} -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Run(run func(totalMemory uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(uint64)) - }) - return _c -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Return() *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) RunAndReturn(run func(uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Return(run) - return _c -} - // NewMockFlowgraphManager creates a new instance of MockFlowgraphManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockFlowgraphManager(t interface { diff --git a/internal/datanode/syncmgr/meta_writer.go b/internal/datanode/syncmgr/meta_writer.go index 0f7c3f8617..4d506be2e5 100644 --- a/internal/datanode/syncmgr/meta_writer.go +++ b/internal/datanode/syncmgr/meta_writer.go @@ -48,12 +48,11 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog) } - // only current segment checkpoint info, - segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID)) - if len(segments) == 0 { + // only current segment checkpoint info + segment, ok := pack.metacache.GetSegmentByID(pack.segmentID) + if !ok { return merr.WrapErrSegmentNotFound(pack.segmentID) } - segment := segments[0] checkPoints = append(checkPoints, &datapb.CheckPoint{ SegmentID: pack.segmentID, NumOfRows: segment.FlushedRows() + pack.batchSize, @@ -140,11 +139,10 @@ func (b *brokerMetaWriter) UpdateSyncV2(pack *SyncTaskV2) error { checkPoints := []*datapb.CheckPoint{} // only current segment checkpoint info, - segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID)) - if len(segments) == 0 { + segment, ok := pack.metacache.GetSegmentByID(pack.segmentID) + if !ok { return merr.WrapErrSegmentNotFound(pack.segmentID) } - segment := segments[0] checkPoints = append(checkPoints, &datapb.CheckPoint{ SegmentID: pack.segmentID, NumOfRows: segment.FlushedRows() + pack.batchSize, diff --git a/internal/datanode/syncmgr/meta_writer_test.go b/internal/datanode/syncmgr/meta_writer_test.go index 8742e8c6b9..23d54d9be4 100644 --- a/internal/datanode/syncmgr/meta_writer_test.go +++ b/internal/datanode/syncmgr/meta_writer_test.go @@ -40,6 +40,7 @@ func (s *MetaWriterSuite) TestNormalSave() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() task := NewSyncTask() task.WithMetaCache(s.metacache) @@ -53,6 +54,7 @@ func (s *MetaWriterSuite) TestReturnError() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTask() task.WithMetaCache(s.metacache) @@ -66,6 +68,7 @@ func (s *MetaWriterSuite) TestNormalSaveV2() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTaskV2() task.WithMetaCache(s.metacache) @@ -79,6 +82,7 @@ func (s *MetaWriterSuite) TestReturnErrorV2() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTaskV2() task.WithMetaCache(s.metacache) diff --git a/internal/datanode/syncmgr/mock_meta_writer.go b/internal/datanode/syncmgr/mock_meta_writer.go new file mode 100644 index 0000000000..17cd5f25d9 --- /dev/null +++ b/internal/datanode/syncmgr/mock_meta_writer.go @@ -0,0 +1,158 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package syncmgr + +import mock "github.com/stretchr/testify/mock" + +// MockMetaWriter is an autogenerated mock type for the MetaWriter type +type MockMetaWriter struct { + mock.Mock +} + +type MockMetaWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter { + return &MockMetaWriter_Expecter{mock: &_m.Mock} +} + +// DropChannel provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) DropChannel(_a0 string) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_DropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannel' +type MockMetaWriter_DropChannel_Call struct { + *mock.Call +} + +// DropChannel is a helper method to define mock.On call +// - _a0 string +func (_e *MockMetaWriter_Expecter) DropChannel(_a0 interface{}) *MockMetaWriter_DropChannel_Call { + return &MockMetaWriter_DropChannel_Call{Call: _e.mock.On("DropChannel", _a0)} +} + +func (_c *MockMetaWriter_DropChannel_Call) Run(run func(_a0 string)) *MockMetaWriter_DropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockMetaWriter_DropChannel_Call) Return(_a0 error) *MockMetaWriter_DropChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(string) error) *MockMetaWriter_DropChannel_Call { + _c.Call.Return(run) + return _c +} + +// UpdateSync provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) UpdateSync(_a0 *SyncTask) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*SyncTask) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_UpdateSync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSync' +type MockMetaWriter_UpdateSync_Call struct { + *mock.Call +} + +// UpdateSync is a helper method to define mock.On call +// - _a0 *SyncTask +func (_e *MockMetaWriter_Expecter) UpdateSync(_a0 interface{}) *MockMetaWriter_UpdateSync_Call { + return &MockMetaWriter_UpdateSync_Call{Call: _e.mock.On("UpdateSync", _a0)} +} + +func (_c *MockMetaWriter_UpdateSync_Call) Run(run func(_a0 *SyncTask)) *MockMetaWriter_UpdateSync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*SyncTask)) + }) + return _c +} + +func (_c *MockMetaWriter_UpdateSync_Call) Return(_a0 error) *MockMetaWriter_UpdateSync_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_UpdateSync_Call) RunAndReturn(run func(*SyncTask) error) *MockMetaWriter_UpdateSync_Call { + _c.Call.Return(run) + return _c +} + +// UpdateSyncV2 provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) UpdateSyncV2(_a0 *SyncTaskV2) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*SyncTaskV2) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_UpdateSyncV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSyncV2' +type MockMetaWriter_UpdateSyncV2_Call struct { + *mock.Call +} + +// UpdateSyncV2 is a helper method to define mock.On call +// - _a0 *SyncTaskV2 +func (_e *MockMetaWriter_Expecter) UpdateSyncV2(_a0 interface{}) *MockMetaWriter_UpdateSyncV2_Call { + return &MockMetaWriter_UpdateSyncV2_Call{Call: _e.mock.On("UpdateSyncV2", _a0)} +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) Run(run func(_a0 *SyncTaskV2)) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*SyncTaskV2)) + }) + return _c +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) Return(_a0 error) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) RunAndReturn(run func(*SyncTaskV2) error) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Return(run) + return _c +} + +// NewMockMetaWriter creates a new instance of MockMetaWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMetaWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMetaWriter { + mock := &MockMetaWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/syncmgr/options.go b/internal/datanode/syncmgr/options.go index 324cdb1700..39da2da647 100644 --- a/internal/datanode/syncmgr/options.go +++ b/internal/datanode/syncmgr/options.go @@ -1,6 +1,8 @@ package syncmgr import ( + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" @@ -18,6 +20,7 @@ func NewSyncTask() *SyncTask { statsBinlogs: make(map[int64]*datapb.FieldBinlog), deltaBinlog: &datapb.FieldBinlog{}, segmentData: make(map[string][]byte), + binlogBlobs: make(map[int64]*storage.Blob), } } @@ -31,16 +34,6 @@ func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask { return t } -func (t *SyncTask) WithInsertData(insertData *storage.InsertData) *SyncTask { - t.insertData = insertData - return t -} - -func (t *SyncTask) WithDeleteData(deleteData *storage.DeleteData) *SyncTask { - t.deleteData = deleteData - return t -} - func (t *SyncTask) WithStartPosition(start *msgpb.MsgPosition) *SyncTask { t.startPosition = start return t @@ -73,6 +66,9 @@ func (t *SyncTask) WithChannelName(chanName string) *SyncTask { func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask { t.schema = schema + t.pkField = lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { + return field.GetIsPrimaryKey() + }) return t } diff --git a/internal/datanode/syncmgr/serializer.go b/internal/datanode/syncmgr/serializer.go new file mode 100644 index 0000000000..60217277d6 --- /dev/null +++ b/internal/datanode/syncmgr/serializer.go @@ -0,0 +1,121 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// Serializer is the interface for storage/storageV2 implementation to encoding +// WriteBuffer into sync task. +type Serializer interface { + EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) +} + +// SyncPack is the struct contains buffer sync data. +type SyncPack struct { + metacache metacache.MetaCache + metawriter MetaWriter + // data + insertData *storage.InsertData + deltaData *storage.DeleteData + // statistics + tsFrom typeutil.Timestamp + tsTo typeutil.Timestamp + startPosition *msgpb.MsgPosition + checkpoint *msgpb.MsgPosition + batchSize int64 // batchSize is the row number of this sync task,not the total num of rows of segemnt + isFlush bool + isDrop bool + // metadata + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + level datapb.SegmentLevel +} + +func (p *SyncPack) WithInsertData(insertData *storage.InsertData) *SyncPack { + p.insertData = insertData + return p +} + +func (p *SyncPack) WithDeleteData(deltaData *storage.DeleteData) *SyncPack { + p.deltaData = deltaData + return p +} + +func (p *SyncPack) WithStartPosition(start *msgpb.MsgPosition) *SyncPack { + p.startPosition = start + return p +} + +func (p *SyncPack) WithCheckpoint(cp *msgpb.MsgPosition) *SyncPack { + p.checkpoint = cp + return p +} + +func (p *SyncPack) WithCollectionID(collID int64) *SyncPack { + p.collectionID = collID + return p +} + +func (p *SyncPack) WithPartitionID(partID int64) *SyncPack { + p.partitionID = partID + return p +} + +func (p *SyncPack) WithSegmentID(segID int64) *SyncPack { + p.segmentID = segID + return p +} + +func (p *SyncPack) WithChannelName(chanName string) *SyncPack { + p.channelName = chanName + return p +} + +func (p *SyncPack) WithTimeRange(from, to typeutil.Timestamp) *SyncPack { + p.tsFrom, p.tsTo = from, to + return p +} + +func (p *SyncPack) WithFlush() *SyncPack { + p.isFlush = true + return p +} + +func (p *SyncPack) WithDrop() *SyncPack { + p.isDrop = true + return p +} + +func (p *SyncPack) WithBatchSize(batchSize int64) *SyncPack { + p.batchSize = batchSize + return p +} + +func (p *SyncPack) WithLevel(level datapb.SegmentLevel) *SyncPack { + p.level = level + return p +} diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go new file mode 100644 index 0000000000..2213d572a1 --- /dev/null +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -0,0 +1,205 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "strconv" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type storageV1Serializer struct { + collectionID int64 + schema *schemapb.CollectionSchema + pkField *schemapb.FieldSchema + + inCodec *storage.InsertCodec + delCodec *storage.DeleteCodec + + metacache metacache.MetaCache + metaWriter MetaWriter +} + +func NewStorageSerializer(metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) { + collectionID := metacache.Collection() + schema := metacache.Schema() + pkField := lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) + if pkField == nil { + return nil, merr.WrapErrServiceInternal("cannot find pk field") + } + meta := &etcdpb.CollectionMeta{ + Schema: schema, + ID: collectionID, + } + inCodec := storage.NewInsertCodecWithSchema(meta) + return &storageV1Serializer{ + collectionID: collectionID, + schema: schema, + pkField: pkField, + + inCodec: inCodec, + delCodec: storage.NewDeleteCodec(), + metacache: metacache, + metaWriter: metaWriter, + }, nil +} + +func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { + task := NewSyncTask() + + log := log.Ctx(ctx).With( + zap.Int64("segmentID", pack.segmentID), + zap.Int64("collectionID", pack.collectionID), + zap.String("channel", pack.channelName), + ) + + if pack.insertData != nil { + memSize := make(map[int64]int64) + for fieldID, fieldData := range pack.insertData.Data { + memSize[fieldID] = int64(fieldData.GetMemorySize()) + } + task.binlogMemsize = memSize + + binlogBlobs, err := s.serializeBinlog(ctx, pack) + if err != nil { + log.Warn("failed to serialize binlog", zap.Error(err)) + return nil, err + } + task.binlogBlobs = binlogBlobs + + singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack) + if err != nil { + log.Warn("failed to serialized statslog", zap.Error(err)) + return nil, err + } + + task.batchStatsBlob = batchStatsBlob + s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID)) + } + + if pack.isFlush { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } + + task.mergedStatsBlob = mergedStatsBlob + task.WithFlush() + } + + if pack.deltaData != nil { + deltaBlob, err := s.serializeDeltalog(pack) + if err != nil { + log.Warn("failed to serialize delta log", zap.Error(err)) + return nil, err + } + task.deltaBlob = deltaBlob + task.deltaRowCount = pack.deltaData.RowCount + } + if pack.isDrop { + task.WithDrop() + } + + s.setTaskMeta(task, pack) + return task, nil +} + +func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { + task.WithCollectionID(pack.collectionID). + WithPartitionID(pack.partitionID). + WithChannelName(pack.channelName). + WithSegmentID(pack.segmentID). + WithBatchSize(pack.batchSize). + WithSchema(s.metacache.Schema()). + WithStartPosition(pack.startPosition). + WithCheckpoint(pack.checkpoint). + WithLevel(pack.level). + WithTimeRange(pack.tsFrom, pack.tsTo). + WithMetaCache(s.metacache). + WithMetaWriter(s.metaWriter). + WithFailureCallback(func(err error) { + // TODO could change to unsub channel in the future + panic(err) + }) +} + +func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) { + blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData) + if err != nil { + return nil, err + } + + result := make(map[int64]*storage.Blob) + for _, blob := range blobs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + if err != nil { + log.Error("serialize buffer failed ... cannot parse string to fieldID ..", zap.Error(err)) + return nil, err + } + + result[fieldID] = blob + } + return result, nil +} + +func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) { + pkFieldData := pack.insertData.Data[s.pkField.GetFieldID()] + rowNum := int64(pkFieldData.RowNum()) + + stats, err := storage.NewPrimaryKeyStats(s.pkField.GetFieldID(), int64(s.pkField.GetDataType()), rowNum) + if err != nil { + return nil, nil, err + } + stats.UpdateByMsgs(pkFieldData) + + blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize) + if err != nil { + return nil, nil, err + } + return stats, blob, nil +} + +func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.Blob, error) { + segment, ok := s.metacache.GetSegmentByID(pack.segmentID) + if !ok { + return nil, merr.WrapErrSegmentNotFound(pack.segmentID) + } + + return s.inCodec.SerializePkStatsList(lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { + return &storage.PrimaryKeyStats{ + FieldID: s.pkField.GetFieldID(), + MaxPk: pks.MaxPK, + MinPk: pks.MinPK, + BF: pks.PkFilter, + PkType: int64(s.pkField.GetDataType()), + } + }), segment.NumOfRows()) +} + +func (s *storageV1Serializer) serializeDeltalog(pack *SyncPack) (*storage.Blob, error) { + return s.delCodec.Serialize(pack.collectionID, pack.partitionID, pack.segmentID, pack.deltaData) +} diff --git a/internal/datanode/syncmgr/storage_serializer_test.go b/internal/datanode/syncmgr/storage_serializer_test.go new file mode 100644 index 0000000000..43894b47b1 --- /dev/null +++ b/internal/datanode/syncmgr/storage_serializer_test.go @@ -0,0 +1,324 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type StorageV1SerializerSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + + schema *schemapb.CollectionSchema + + mockCache *metacache.MockMetaCache + mockMetaWriter *MockMetaWriter + + serializer *storageV1Serializer +} + +func (s *StorageV1SerializerSuite) SetupSuite() { + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID) + s.schema = &schemapb.CollectionSchema{ + Name: "serializer_v1_test_col", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64}, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + s.mockCache = metacache.NewMockMetaCache(s.T()) + s.mockMetaWriter = NewMockMetaWriter(s.T()) +} + +func (s *StorageV1SerializerSuite) SetupTest() { + s.mockCache.EXPECT().Collection().Return(s.collectionID) + s.mockCache.EXPECT().Schema().Return(s.schema) + + var err error + s.serializer, err = NewStorageSerializer(s.mockCache, s.mockMetaWriter) + s.Require().NoError(err) +} + +func (s *StorageV1SerializerSuite) getEmptyInsertBuffer() *storage.InsertData { + buf, err := storage.NewInsertData(s.schema) + s.Require().NoError(err) + + return buf +} + +func (s *StorageV1SerializerSuite) getInsertBuffer() *storage.InsertData { + buf := s.getEmptyInsertBuffer() + + // generate data + for i := 0; i < 10; i++ { + data := make(map[storage.FieldID]any) + data[common.RowIDField] = int64(i + 1) + data[common.TimeStampField] = int64(i + 1) + data[100] = int64(i + 1) + vector := lo.RepeatBy(128, func(_ int) float32 { + return rand.Float32() + }) + data[101] = vector + err := buf.Append(data) + s.Require().NoError(err) + } + return buf +} + +func (s *StorageV1SerializerSuite) getDeleteBuffer() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + ts := tsoutil.ComposeTSByTime(time.Now(), 0) + buf.Append(pk, ts) + } + return buf +} + +func (s *StorageV1SerializerSuite) getDeleteBufferZeroTs() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + buf.Append(pk, 0) + } + return buf +} + +func (s *StorageV1SerializerSuite) getBasicPack() *SyncPack { + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName). + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + + return pack +} + +func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet { + bfs := metacache.NewBloomFilterSet() + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + ids := []int64{1, 2, 3, 4, 5, 6, 7} + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + + bfs.UpdatePKRange(fd) + return bfs +} + +func (s *StorageV1SerializerSuite) TestSerializeInsert() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("without_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithDrop() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.True(taskV1.isDrop) + }) + + s.Run("with_empty_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_normal_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.Len(taskV1.binlogBlobs, 4) + s.NotNil(taskV1.batchStatsBlob) + }) + + s.Run("with_flush_segment_not_found", func() { + pack := s.getBasicPack() + pack.WithFlush() + + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_flush", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithFlush() + + bfs := s.getBfs() + segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(segInfo) + metacache.CompactTo(metacache.NullSegment)(segInfo) + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(segInfo) + }).Return().Once() + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.Len(taskV1.binlogBlobs, 4) + s.NotNil(taskV1.batchStatsBlob) + s.NotNil(taskV1.mergedStatsBlob) + }) +} + +func (s *StorageV1SerializerSuite) TestSerializeDelete() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("serialize_failed", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_normal", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBuffer()) + pack.WithTimeRange(50, 100) + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.NotNil(taskV1.deltaBlob) + }) +} + +func (s *StorageV1SerializerSuite) TestBadSchema() { + mockCache := metacache.NewMockMetaCache(s.T()) + mockCache.EXPECT().Collection().Return(s.collectionID).Once() + mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once() + _, err := NewStorageSerializer(mockCache, s.mockMetaWriter) + s.Error(err) +} + +func TestStorageV1Serializer(t *testing.T) { + suite.Run(t, new(StorageV1SerializerSuite)) +} diff --git a/internal/datanode/syncmgr/storage_v2_serializer.go b/internal/datanode/syncmgr/storage_v2_serializer.go new file mode 100644 index 0000000000..7c11936fd7 --- /dev/null +++ b/internal/datanode/syncmgr/storage_v2_serializer.go @@ -0,0 +1,246 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus-storage/go/storage/schema" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/storage" + iTypeutil "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type storageV2Serializer struct { + *storageV1Serializer + + arrowSchema *arrow.Schema + storageV2Cache *metacache.StorageV2Cache + inCodec *storage.InsertCodec + metacache metacache.MetaCache +} + +func NewStorageV2Serializer( + storageV2Cache *metacache.StorageV2Cache, + metacache metacache.MetaCache, + metaWriter MetaWriter, +) (*storageV2Serializer, error) { + v1Serializer, err := NewStorageSerializer(metacache, metaWriter) + if err != nil { + return nil, err + } + + return &storageV2Serializer{ + storageV1Serializer: v1Serializer, + storageV2Cache: storageV2Cache, + arrowSchema: storageV2Cache.ArrowSchema(), + metacache: metacache, + }, nil +} + +func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { + task := NewSyncTaskV2() + + space, err := s.storageV2Cache.GetOrCreateSpace(pack.segmentID, SpaceCreatorFunc(pack.segmentID, s.schema, s.arrowSchema)) + if err != nil { + log.Warn("failed to get or create space", zap.Error(err)) + return nil, err + } + + task.space = space + if pack.insertData != nil { + insertReader, err := s.serializeInsertData(pack) + if err != nil { + log.Warn("failed to serialize insert data with storagev2", zap.Error(err)) + return nil, err + } + + task.reader = insertReader + + singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack) + if err != nil { + log.Warn("failed to serialized statslog", zap.Error(err)) + return nil, err + } + + task.batchStatsBlob = batchStatsBlob + s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID)) + } + + if pack.isFlush { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } + + task.mergedStatsBlob = mergedStatsBlob + task.WithFlush() + } + + if pack.deltaData != nil { + deltaReader, err := s.serializeDeltaData(pack) + if err != nil { + log.Warn("failed to serialize delta data", zap.Error(err)) + return nil, err + } + task.deleteReader = deltaReader + } + + if pack.isDrop { + task.WithDrop() + } + + s.setTaskMeta(task, pack) + return task, nil +} + +func (s *storageV2Serializer) setTaskMeta(task *SyncTaskV2, pack *SyncPack) { + task.WithCollectionID(pack.collectionID). + WithPartitionID(pack.partitionID). + WithChannelName(pack.channelName). + WithSegmentID(pack.segmentID). + WithBatchSize(pack.batchSize). + WithSchema(s.metacache.Schema()). + WithStartPosition(pack.startPosition). + WithCheckpoint(pack.checkpoint). + WithLevel(pack.level). + WithTimeRange(pack.tsFrom, pack.tsTo). + WithMetaCache(s.metacache). + WithMetaWriter(s.metaWriter). + WithFailureCallback(func(err error) { + // TODO could change to unsub channel in the future + panic(err) + }) +} + +func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordReader, error) { + builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema) + defer builder.Release() + + if err := buildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil { + return nil, err + } + + rec := builder.NewRecord() + defer rec.Release() + + itr, err := array.NewRecordReader(s.arrowSchema, []arrow.Record{rec}) + if err != nil { + return nil, err + } + itr.Retain() + + return itr, nil +} + +func (s *storageV2Serializer) serializeDeltaData(pack *SyncPack) (array.RecordReader, error) { + fields := make([]*schemapb.FieldSchema, 0, 2) + tsField := &schemapb.FieldSchema{ + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + } + fields = append(fields, s.pkField, tsField) + + deltaArrowSchema, err := iTypeutil.ConvertToArrowSchema(fields) + if err != nil { + return nil, err + } + + builder := array.NewRecordBuilder(memory.DefaultAllocator, deltaArrowSchema) + defer builder.Release() + + switch s.pkField.GetDataType() { + case schemapb.DataType_Int64: + pb := builder.Field(0).(*array.Int64Builder) + for _, pk := range pack.deltaData.Pks { + pb.Append(pk.GetValue().(int64)) + } + case schemapb.DataType_VarChar: + pb := builder.Field(0).(*array.StringBuilder) + for _, pk := range pack.deltaData.Pks { + pb.Append(pk.GetValue().(string)) + } + default: + return nil, merr.WrapErrParameterInvalidMsg("unexpected pk type %v", s.pkField.GetDataType()) + } + + for _, ts := range pack.deltaData.Tss { + builder.Field(1).(*array.Int64Builder).Append(int64(ts)) + } + + rec := builder.NewRecord() + defer rec.Release() + + reader, err := array.NewRecordReader(deltaArrowSchema, []arrow.Record{rec}) + if err != nil { + return nil, err + } + reader.Retain() + + return reader, nil +} + +func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) { + return func() (*milvus_storage.Space, error) { + url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s", + params.Params.CommonCfg.StorageScheme.GetValue(), + params.Params.MinioCfg.AccessKeyID.GetValue(), + params.Params.MinioCfg.SecretAccessKey.GetValue(), + params.Params.MinioCfg.BucketName.GetValue(), + segmentID, + params.Params.MinioCfg.Address.GetValue()) + + pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema) + if err != nil { + return nil, err + } + vecSchema, err := typeutil.GetVectorFieldSchema(collSchema) + if err != nil { + return nil, err + } + space, err := milvus_storage.Open( + url, + options.NewSpaceOptionBuilder(). + SetSchema(schema.NewSchema( + arrowSchema, + &schema.SchemaOptions{ + PrimaryColumn: pkSchema.Name, + VectorColumn: vecSchema.Name, + VersionColumn: common.TimeStampFieldName, + }, + )). + Build(), + ) + return space, err + } +} diff --git a/internal/datanode/syncmgr/storage_v2_serializer_test.go b/internal/datanode/syncmgr/storage_v2_serializer_test.go new file mode 100644 index 0000000000..9a4711a7d6 --- /dev/null +++ b/internal/datanode/syncmgr/storage_v2_serializer_test.go @@ -0,0 +1,364 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus-storage/go/storage/schema" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type StorageV2SerializerSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + + schema *schemapb.CollectionSchema + storageCache *metacache.StorageV2Cache + mockCache *metacache.MockMetaCache + mockMetaWriter *MockMetaWriter + + serializer *storageV2Serializer +} + +func (s *StorageV2SerializerSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml0_%d_v1", s.collectionID) + s.schema = &schemapb.CollectionSchema{ + Name: "sync_task_test_col", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64, Name: common.RowIDFieldName}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64, Name: common.TimeStampFieldName}, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + s.mockCache = metacache.NewMockMetaCache(s.T()) + s.mockMetaWriter = NewMockMetaWriter(s.T()) +} + +func (s *StorageV2SerializerSuite) SetupTest() { + storageCache, err := metacache.NewStorageV2Cache(s.schema) + s.Require().NoError(err) + s.storageCache = storageCache + + s.mockCache.EXPECT().Collection().Return(s.collectionID) + s.mockCache.EXPECT().Schema().Return(s.schema) + + s.serializer, err = NewStorageV2Serializer(storageCache, s.mockCache, s.mockMetaWriter) + s.Require().NoError(err) +} + +func (s *StorageV2SerializerSuite) getSpace() *milvus_storage.Space { + tmpDir := s.T().TempDir() + space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder(). + SetSchema(schema.NewSchema(s.storageCache.ArrowSchema(), &schema.SchemaOptions{ + PrimaryColumn: "pk", VectorColumn: "vector", VersionColumn: common.TimeStampFieldName, + })).Build()) + s.Require().NoError(err) + return space +} + +func (s *StorageV2SerializerSuite) getBasicPack() *SyncPack { + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName). + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + + return pack +} + +func (s *StorageV2SerializerSuite) getEmptyInsertBuffer() *storage.InsertData { + buf, err := storage.NewInsertData(s.schema) + s.Require().NoError(err) + + return buf +} + +func (s *StorageV2SerializerSuite) getInsertBuffer() *storage.InsertData { + buf := s.getEmptyInsertBuffer() + + // generate data + for i := 0; i < 10; i++ { + data := make(map[storage.FieldID]any) + data[common.RowIDField] = int64(i + 1) + data[common.TimeStampField] = int64(i + 1) + data[100] = int64(i + 1) + vector := lo.RepeatBy(128, func(_ int) float32 { + return rand.Float32() + }) + data[101] = vector + err := buf.Append(data) + s.Require().NoError(err) + } + return buf +} + +func (s *StorageV2SerializerSuite) getDeleteBuffer() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + ts := tsoutil.ComposeTSByTime(time.Now(), 0) + buf.Append(pk, ts) + } + return buf +} + +func (s *StorageV2SerializerSuite) getDeleteBufferZeroTs() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + buf.Append(pk, 0) + } + return buf +} + +func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet { + bfs := metacache.NewBloomFilterSet() + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + ids := []int64{1, 2, 3, 4, 5, 6, 7} + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + + bfs.UpdatePKRange(fd) + return bfs +} + +func (s *StorageV2SerializerSuite) TestSerializeInsert() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.storageCache.SetSpace(s.segmentID, s.getSpace()) + + s.Run("no_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithDrop() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + taskV1, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.True(taskV1.isDrop) + }) + + s.Run("empty_insert_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_normal_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.reader) + s.NotNil(taskV2.batchStatsBlob) + }) + + s.Run("with_flush_segment_not_found", func() { + pack := s.getBasicPack() + pack.WithFlush() + + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_flush", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithFlush() + + bfs := s.getBfs() + segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(segInfo) + metacache.CompactTo(metacache.NullSegment)(segInfo) + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(segInfo) + }).Return().Once() + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.mergedStatsBlob) + }) +} + +func (s *StorageV2SerializerSuite) TestSerializeDelete() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("serialize_failed", func() { + pkField := s.serializer.pkField + s.serializer.pkField = &schemapb.FieldSchema{} + defer func() { + s.serializer.pkField = pkField + }() + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_failed_bad_pk", func() { + pkField := s.serializer.pkField + s.serializer.pkField = &schemapb.FieldSchema{ + DataType: schemapb.DataType_Array, + } + defer func() { + s.serializer.pkField = pkField + }() + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_normal", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBuffer()) + pack.WithTimeRange(50, 100) + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.deleteReader) + }) +} + +func (s *StorageV2SerializerSuite) TestBadSchema() { + mockCache := metacache.NewMockMetaCache(s.T()) + mockCache.EXPECT().Collection().Return(s.collectionID).Once() + mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once() + _, err := NewStorageV2Serializer(s.storageCache, mockCache, s.mockMetaWriter) + s.Error(err) +} + +func TestStorageV2Serializer(t *testing.T) { + suite.Run(t, new(StorageV2SerializerSuite)) +} diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index 6d2d83a336..ac3d8db2c4 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -1,12 +1,26 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package syncmgr import ( "context" "fmt" "path" - "strconv" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -15,7 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -32,15 +45,13 @@ type SyncTask struct { chunkManager storage.ChunkManager allocator allocator.Interface - insertData *storage.InsertData - deleteData *storage.DeleteData - segment *metacache.SegmentInfo collectionID int64 partitionID int64 segmentID int64 channelName string schema *schemapb.CollectionSchema + pkField *schemapb.FieldSchema startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition // batchSize is the row number of this sync task, @@ -61,6 +72,13 @@ type SyncTask struct { statsBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog deltaBinlog *datapb.FieldBinlog + binlogBlobs map[int64]*storage.Blob // fieldID => blob + binlogMemsize map[int64]int64 // memory size + batchStatsBlob *storage.Blob + mergedStatsBlob *storage.Blob + deltaBlob *storage.Blob + deltaRowCount int64 + segmentData map[string][]byte writeRetryOpts []retry.Option @@ -90,14 +108,18 @@ func (t *SyncTask) handleError(err error, metricSegLevel string) { } } -func (t *SyncTask) Run() error { +func (t *SyncTask) Run() (err error) { t.tr = timerecord.NewTimeRecorder("syncTask") - var metricSegLevel string = t.level.String() + metricSegLevel := t.level.String() log := t.getLogger() - var err error - var has bool + defer func() { + if err != nil { + t.handleError(err, metricSegLevel) + } + }() + var has bool t.segment, has = t.metacache.GetSegmentByID(t.segmentID) if !has { log.Warn("failed to sync data, segment not found in metacache") @@ -118,30 +140,39 @@ func (t *SyncTask) Run() error { t.segmentID = t.segment.CompactTo() } - err = t.serializeInsertData() + err = t.processInsertBlobs() + if err != nil { + log.Warn("failed to process insert blobs", zap.Error(err)) + return err + } + + err = t.processStatsBlob() if err != nil { log.Warn("failed to serialize insert data", zap.Error(err)) t.handleError(err, metricSegLevel) + log.Warn("failed to process stats blobs", zap.Error(err)) return err } - err = t.serializeDeleteData() + err = t.processDeltaBlob() if err != nil { log.Warn("failed to serialize delete data", zap.Error(err)) t.handleError(err, metricSegLevel) + log.Warn("failed to process delta blobs", zap.Error(err)) return err } - var totalSize float64 = 0 - if t.deleteData != nil { - totalSize += float64(t.deleteData.Size()) - } + /* + var totalSize float64 = 0 + if t.deleteData != nil { + totalSize += float64(t.deleteData.Size()) + } - if t.insertData != nil { - totalSize += float64(t.insertData.GetMemorySize()) - } - metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize) - metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds())) + if t.insertData != nil { + totalSize += float64(t.insertData.GetMemorySize()) + } + metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize) + metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds()))*/ err = t.writeLogs() if err != nil { @@ -180,86 +211,18 @@ func (t *SyncTask) Run() error { return nil } -func (t *SyncTask) serializeInsertData() error { - err := t.serializeBinlog() - if err != nil { - return err - } - - err = t.serializePkStatsLog() - if err != nil { - return err - } - - return nil -} - -func (t *SyncTask) serializeDeleteData() error { - if t.deleteData == nil { +func (t *SyncTask) processInsertBlobs() error { + if len(t.binlogBlobs) == 0 { return nil } - delCodec := storage.NewDeleteCodec() - blob, err := delCodec.Serialize(t.collectionID, t.partitionID, t.segmentID, t.deleteData) + logidx, _, err := t.allocator.Alloc(uint32(len(t.binlogBlobs))) if err != nil { return err } - logID, err := t.allocator.AllocOne() - if err != nil { - log.Error("failed to alloc ID", zap.Error(err)) - return err - } - - value := blob.GetValue() - data := &datapb.Binlog{} - - blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID) - blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) - - t.segmentData[blobPath] = value - data.LogSize = int64(len(blob.Value)) - data.LogPath = blobPath - data.TimestampFrom = t.tsFrom - data.TimestampTo = t.tsTo - data.EntriesNum = t.deleteData.RowCount - t.appendDeltalog(data) - - return nil -} - -func (t *SyncTask) serializeBinlog() error { - if t.insertData == nil { - return nil - } - - // get memory size of buffer data - memSize := make(map[int64]int) - for fieldID, fieldData := range t.insertData.Data { - memSize[fieldID] = fieldData.GetMemorySize() - } - - inCodec := t.getInCodec() - - blobs, err := inCodec.Serialize(t.partitionID, t.segmentID, t.insertData) - if err != nil { - return err - } - - logidx, _, err := t.allocator.Alloc(uint32(len(blobs))) - if err != nil { - return err - } - - for _, blob := range blobs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return err - } - + for fieldID, blob := range t.binlogBlobs { k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logidx) - // [rootPath]/[insert_log]/key key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k) t.segmentData[key] = blob.GetValue() t.appendBinlog(fieldID, &datapb.Binlog{ @@ -267,65 +230,50 @@ func (t *SyncTask) serializeBinlog() error { TimestampFrom: t.tsFrom, TimestampTo: t.tsTo, LogPath: key, - LogSize: int64(memSize[fieldID]), + LogSize: t.binlogMemsize[fieldID], }) - - logidx += 1 + logidx++ } return nil } -func (t *SyncTask) convertInsertData2PkStats(pkFieldID int64, dataType schemapb.DataType) (*storage.PrimaryKeyStats, int64) { - pkFieldData := t.insertData.Data[pkFieldID] - - rowNum := int64(pkFieldData.RowNum()) - - stats, err := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum) - if err != nil { - return nil, 0 +func (t *SyncTask) processStatsBlob() error { + if t.batchStatsBlob != nil { + logidx, err := t.allocator.AllocOne() + if err != nil { + return err + } + t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), logidx, t.batchSize) } - stats.UpdateByMsgs(pkFieldData) - return stats, rowNum -} - -func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error { - blob, err := t.getInCodec().SerializePkStats(stats, rowNum) - if err != nil { - return err + if t.mergedStatsBlob != nil { + totalRowNum := t.segment.NumOfRows() + t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum) } - - logidx, err := t.allocator.AllocOne() - if err != nil { - return err - } - t.convertBlob2StatsBinlog(blob, fieldID, logidx, rowNum) - return nil } -func (t *SyncTask) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType) error { - segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - var statsList []*storage.PrimaryKeyStats - var totalRowNum int64 - for _, segment := range segments { - totalRowNum += segment.NumOfRows() - statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { - return &storage.PrimaryKeyStats{ - FieldID: fieldID, - MaxPk: pks.MaxPK, - MinPk: pks.MinPK, - BF: pks.PkFilter, - PkType: int64(pkType), - } - })...) - } +func (t *SyncTask) processDeltaBlob() error { + if t.deltaBlob != nil { + logID, err := t.allocator.AllocOne() + if err != nil { + log.Error("failed to alloc ID", zap.Error(err)) + return err + } - blob, err := t.getInCodec().SerializePkStatsList(statsList, totalRowNum) - if err != nil { - return err - } - t.convertBlob2StatsBinlog(blob, fieldID, int64(storage.CompoundStatsType), totalRowNum) + value := t.deltaBlob.GetValue() + data := &datapb.Binlog{} + blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID) + blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) + + t.segmentData[blobPath] = value + data.LogSize = int64(len(t.deltaBlob.Value)) + data.LogPath = blobPath + data.TimestampFrom = t.tsFrom + data.TimestampTo = t.tsTo + data.EntriesNum = t.deltaRowCount + t.appendDeltalog(data) + } return nil } @@ -344,30 +292,6 @@ func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID in }) } -func (t *SyncTask) serializePkStatsLog() error { - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fieldID := pkField.GetFieldID() - if t.insertData != nil { - stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType()) - if stats != nil && rowNum > 0 { - err := t.serializeSinglePkStats(fieldID, stats, rowNum) - if err != nil { - return err - } - } - } - - // skip statslog for empty segment - // DO NOT use level check here since Level zero segment may contain insert data in the future - if t.isFlush && t.segment.NumOfRows() > 0 { - return t.serializeMergedPkStats(fieldID, pkField.GetDataType()) - } - return nil -} - func (t *SyncTask) appendBinlog(fieldID int64, binlog *datapb.Binlog) { fieldBinlog, ok := t.insertBinlogs[fieldID] if !ok { @@ -407,15 +331,6 @@ func (t *SyncTask) writeMeta() error { return t.metaWriter.UpdateSync(t) } -func (t *SyncTask) getInCodec() *storage.InsertCodec { - meta := &etcdpb.CollectionMeta{ - Schema: t.schema, - ID: t.collectionID, - } - - return storage.NewInsertCodecWithSchema(meta) -} - func (t *SyncTask) SegmentID() int64 { return t.segmentID } diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 4e8891f19b..dc73ee708e 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -1,6 +1,23 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package syncmgr import ( + "fmt" "math/rand" "testing" "time" @@ -43,11 +60,10 @@ type SyncTaskSuite struct { func (s *SyncTaskSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) - s.collectionID = 100 - s.partitionID = 101 - s.segmentID = 1001 - s.channelName = "by-dev-rootcoord-dml_0_100v0" - + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID) s.schema = &schemapb.CollectionSchema{ Name: "sync_task_test_col", Fields: []*schemapb.FieldSchema{ @@ -171,7 +187,7 @@ func (s *SyncTaskSuite) TestRunNormal() { s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.Run("without_insert_delete", func() { + s.Run("without_data", func() { task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithTimeRange(50, 100) @@ -187,7 +203,6 @@ func (s *SyncTaskSuite) TestRunNormal() { s.Run("with_insert_delete_cp", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -195,47 +210,55 @@ func (s *SyncTaskSuite) TestRunNormal() { MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.binlogBlobs[100] = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } err := task.Run() s.NoError(err) }) - s.Run("with_insert_delete_flush", func() { + s.Run("with_statslog", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) - task.WithFlush() - task.WithDrop() + task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ ChannelName: s.channelName, MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.WithFlush() + task.batchStatsBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } + task.mergedStatsBlob = &storage.Blob{ + Key: "1", + Value: []byte("test_data"), + } err := task.Run() s.NoError(err) }) - s.Run("with_zero_numrow_insertdata", func() { + s.Run("with_delta_data", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getEmptyInsertBuffer()) - task.WithFlush() - task.WithDrop() + task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ ChannelName: s.channelName, MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.WithDrop() + task.deltaBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } err := task.Run() - s.Error(err) - - err = task.serializePkStatsLog() s.NoError(err) - stats, rowNum := task.convertInsertData2PkStats(100, schemapb.DataType_Int64) - s.Nil(stats) - s.Zero(rowNum) }) } @@ -249,7 +272,10 @@ func (s *SyncTaskSuite) TestRunL0Segment() { s.Run("pure_delete_l0_flush", func() { task := s.getSuiteSyncTask() - task.WithDeleteData(s.getDeleteBuffer()) + task.deltaBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -306,7 +332,6 @@ func (s *SyncTaskSuite) TestRunError() { flag := false handler := func(_ error) { flag = true } task := s.getSuiteSyncTask().WithFailureCallback(handler) - task.WithInsertData(s.getEmptyInsertBuffer()) err := task.Run() @@ -318,29 +343,22 @@ func (s *SyncTaskSuite) TestRunError() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet()) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.Run("serialize_insert_fail", func() { - flag := false - handler := func(_ error) { flag = true } - task := s.getSuiteSyncTask().WithFailureCallback(handler) - task.WithInsertData(s.getEmptyInsertBuffer()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + s.Run("metawrite_fail", func() { + s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked")) + + task := s.getSuiteSyncTask() + task.WithMetaWriter(BrokerMetaWriter(s.broker, retry.Attempts(1))) + task.WithTimeRange(50, 100) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) err := task.Run() - s.Error(err) - s.True(flag) - }) - - s.Run("serailize_delete_fail", func() { - flag := false - handler := func(_ error) { flag = true } - task := s.getSuiteSyncTask().WithFailureCallback(handler) - - task.WithDeleteData(s.getDeleteBufferZeroTs()) - - err := task.Run() - - s.Error(err) - s.True(flag) }) s.Run("chunk_manager_save_fail", func() { @@ -348,10 +366,13 @@ func (s *SyncTaskSuite) TestRunError() { handler := func(_ error) { flag = true } s.chunkManager.ExpectedCalls = nil s.chunkManager.EXPECT().RootPath().Return("files") - s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(errors.New("mocked")) + s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked"))) task := s.getSuiteSyncTask().WithFailureCallback(handler) + task.binlogBlobs[100] = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithWriteRetryOptions(retry.Attempts(1)) err := task.Run() diff --git a/internal/datanode/syncmgr/taskv2.go b/internal/datanode/syncmgr/taskv2.go index 547c738182..293ed92ff7 100644 --- a/internal/datanode/syncmgr/taskv2.go +++ b/internal/datanode/syncmgr/taskv2.go @@ -19,12 +19,9 @@ package syncmgr import ( "context" "math" - "strconv" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -36,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" - typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -75,14 +71,13 @@ func (t *SyncTaskV2) Run() error { log := t.getLogger() var err error - infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - if len(infos) == 0 { + segment, ok := t.metacache.GetSegmentByID(t.segmentID) + if !ok { log.Warn("failed to sync data, segment not found in metacache") t.handleError(err) return merr.WrapErrSegmentNotFound(t.segmentID) } - segment := infos[0] if segment.CompactTo() > 0 { log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo())) // update sync task segment id @@ -90,21 +85,6 @@ func (t *SyncTaskV2) Run() error { t.segmentID = segment.CompactTo() } - if err = t.serializeInsertData(); err != nil { - t.handleError(err) - return err - } - - if err = t.serializeStatsData(); err != nil { - t.handleError(err) - return err - } - - if err = t.serializeDeleteData(); err != nil { - t.handleError(err) - return err - } - if err = t.writeSpace(); err != nil { t.handleError(err) return err @@ -128,156 +108,6 @@ func (t *SyncTaskV2) Run() error { return nil } -func (t *SyncTaskV2) serializeInsertData() error { - if t.insertData == nil { - return nil - } - - b := array.NewRecordBuilder(memory.DefaultAllocator, t.arrowSchema) - defer b.Release() - - if err := buildRecord(b, t.insertData, t.schema.Fields); err != nil { - return err - } - - rec := b.NewRecord() - defer rec.Release() - - itr, err := array.NewRecordReader(t.arrowSchema, []arrow.Record{rec}) - if err != nil { - return err - } - itr.Retain() - t.reader = itr - return nil -} - -func (t *SyncTaskV2) serializeStatsData() error { - if t.insertData == nil { - return nil - } - - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fieldID := pkField.GetFieldID() - - stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType()) - - // not flush and not insert data - if !t.isFlush && stats == nil { - return nil - } - if t.isFlush { - return t.serializeMergedPkStats(fieldID, pkField.GetDataType(), stats, rowNum) - } - - return t.serializeSinglePkStats(fieldID, stats, rowNum) -} - -func (t *SyncTaskV2) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType, stats *storage.PrimaryKeyStats, rowNum int64) error { - segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - var statsList []*storage.PrimaryKeyStats - var oldRowNum int64 - for _, segment := range segments { - oldRowNum += segment.NumOfRows() - statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { - return &storage.PrimaryKeyStats{ - FieldID: fieldID, - MaxPk: pks.MaxPK, - MinPk: pks.MinPK, - BF: pks.PkFilter, - PkType: int64(pkType), - } - })...) - } - if stats != nil { - statsList = append(statsList, stats) - } - - blob, err := t.getInCodec().SerializePkStatsList(statsList, oldRowNum+rowNum) - if err != nil { - return err - } - blob.Key = strconv.Itoa(int(storage.CompoundStatsType)) - t.statsBlob = blob - return nil -} - -func (t *SyncTaskV2) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error { - blob, err := t.getInCodec().SerializePkStats(stats, rowNum) - if err != nil { - return err - } - - logidx, err := t.allocator.AllocOne() - if err != nil { - return err - } - - blob.Key = strconv.Itoa(int(logidx)) - t.statsBlob = blob - return nil -} - -func (t *SyncTaskV2) serializeDeleteData() error { - if t.deleteData == nil { - return nil - } - - fields := make([]*schemapb.FieldSchema, 0) - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fields = append(fields, pkField) - tsField := &schemapb.FieldSchema{ - FieldID: common.TimeStampField, - Name: common.TimeStampFieldName, - DataType: schemapb.DataType_Int64, - } - fields = append(fields, tsField) - - schema, err := typeutil2.ConvertToArrowSchema(fields) - if err != nil { - return err - } - - b := array.NewRecordBuilder(memory.DefaultAllocator, schema) - defer b.Release() - - switch pkField.DataType { - case schemapb.DataType_Int64: - pb := b.Field(0).(*array.Int64Builder) - for _, pk := range t.deleteData.Pks { - pb.Append(pk.GetValue().(int64)) - } - case schemapb.DataType_VarChar: - pb := b.Field(0).(*array.StringBuilder) - for _, pk := range t.deleteData.Pks { - pb.Append(pk.GetValue().(string)) - } - default: - return merr.WrapErrParameterInvalidMsg("unexpected pk type %v", pkField.DataType) - } - - for _, ts := range t.deleteData.Tss { - b.Field(1).(*array.Int64Builder).Append(int64(ts)) - } - - rec := b.NewRecord() - defer rec.Release() - - reader, err := array.NewRecordReader(schema, []arrow.Record{rec}) - if err != nil { - return err - } - - t.deleteReader = reader - return nil -} - func (t *SyncTaskV2) writeSpace() error { defer func() { if t.reader != nil { @@ -288,37 +118,6 @@ func (t *SyncTaskV2) writeSpace() error { } }() - // url := fmt.Sprintf("s3://%s:%s@%s/%d?endpoint_override=%s", - // params.Params.MinioCfg.AccessKeyID.GetValue(), - // params.Params.MinioCfg.SecretAccessKey.GetValue(), - // params.Params.MinioCfg.BucketName.GetValue(), - // t.segmentID, - // params.Params.MinioCfg.Address.GetValue()) - - // pkSchema, err := typeutil.GetPrimaryFieldSchema(t.schema) - // if err != nil { - // return err - // } - // vecSchema, err := typeutil.GetVectorFieldSchema(t.schema) - // if err != nil { - // return err - // } - // space, err := milvus_storage.Open( - // url, - // options.NewSpaceOptionBuilder(). - // SetSchema(schema.NewSchema( - // t.arrowSchema, - // &schema.SchemaOptions{ - // PrimaryColumn: pkSchema.Name, - // VectorColumn: vecSchema.Name, - // VersionColumn: common.TimeStampFieldName, - // }, - // )). - // Build(), - // ) - // if err != nil { - // return err - // } txn := t.space.NewTransaction() if t.reader != nil { txn.Write(t.reader, &options.DefaultWriteOptions) @@ -483,16 +282,6 @@ func (t *SyncTaskV2) WithAllocator(allocator allocator.Interface) *SyncTaskV2 { return t } -func (t *SyncTaskV2) WithInsertData(insertData *storage.InsertData) *SyncTaskV2 { - t.insertData = insertData - return t -} - -func (t *SyncTaskV2) WithDeleteData(deleteData *storage.DeleteData) *SyncTaskV2 { - t.deleteData = deleteData - return t -} - func (t *SyncTaskV2) WithStartPosition(start *msgpb.MsgPosition) *SyncTaskV2 { t.startPosition = start return t diff --git a/internal/datanode/syncmgr/taskv2_test.go b/internal/datanode/syncmgr/taskv2_test.go index 226abb5ea6..c58b45400d 100644 --- a/internal/datanode/syncmgr/taskv2_test.go +++ b/internal/datanode/syncmgr/taskv2_test.go @@ -17,6 +17,7 @@ package syncmgr import ( + "context" "fmt" "math/rand" "testing" @@ -28,7 +29,6 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -167,19 +166,33 @@ func (s *SyncTaskSuiteV2) getDeleteBufferZeroTs() *storage.DeleteData { } func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 { - log.Info("space", zap.Any("space", s.space)) - task := NewSyncTaskV2(). - WithArrowSchema(s.arrowSchema). - WithSpace(s.space). - WithCollectionID(s.collectionID). + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). WithPartitionID(s.partitionID). WithSegmentID(s.segmentID). WithChannelName(s.channelName). - WithSchema(s.schema). - WithAllocator(s.allocator). - WithMetaCache(s.metacache) + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithDeleteData(s.getDeleteBuffer()) - return task + storageCache, err := metacache.NewStorageV2Cache(s.schema) + s.Require().NoError(err) + + s.metacache.EXPECT().Collection().Return(s.collectionID) + s.metacache.EXPECT().Schema().Return(s.schema) + serializer, err := NewStorageV2Serializer(storageCache, s.metacache, nil) + s.Require().NoError(err) + task, err := serializer.EncodeBuffer(context.Background(), pack) + s.Require().NoError(err) + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + taskV2.WithMetaCache(s.metacache) + + return taskV2 } func (s *SyncTaskSuiteV2) TestRunNormal() { @@ -202,6 +215,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { bfs.UpdatePKRange(fd) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -221,7 +235,6 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { s.Run("with_insert_delete_cp", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -233,22 +246,6 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { err := task.Run() s.NoError(err) }) - - s.Run("with_insert_delete_flush", func() { - task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) - task.WithFlush() - task.WithDrop() - task.WithMetaWriter(BrokerMetaWriter(s.broker)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - - err := task.Run() - s.NoError(err) - }) } func (s *SyncTaskSuiteV2) TestBuildRecord() { diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 6fe27d7898..44bc68f647 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -18,8 +18,12 @@ type bfWriteBuffer struct { } func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { + base, err := newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option) + if err != nil { + return nil, err + } return &bfWriteBuffer{ - writeBufferBase: newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option), + writeBufferBase: base, syncMgr: syncMgr, }, nil } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 9fa81aa0e5..3dcd0b9063 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -63,6 +63,10 @@ func (s *BFWriteBufferSuite) SetupSuite() { }, }, } + + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageV2Cache = storageCache } func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { @@ -154,7 +158,9 @@ func (s *BFWriteBufferSuite) SetupTest() { } func (s *BFWriteBufferSuite) TestBufferData() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{}) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{}) s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) @@ -205,6 +211,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true") + defer params.Params.Reset(params.Params.CommonCfg.EnableStorageV2.Key) params.Params.CommonCfg.StorageScheme.SwapTempValue("file") tmpDir := s.T().TempDir() arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) @@ -279,6 +286,14 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { }) } +func (s *BFWriteBufferSuite) TestCreateFailure() { + metacache := metacache.NewMockMetaCache(s.T()) + metacache.EXPECT().Collection().Return(s.collID) + metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) + _, err := NewBFWriteBuffer(s.channelName, metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{}) + s.Error(err) +} + func TestBFWriteBuffer(t *testing.T) { suite.Run(t, new(BFWriteBufferSuite)) } diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 3cf4041e71..3d9f03fc1f 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -32,10 +32,14 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca if option.idAllocator == nil { return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer") } + base, err := newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option) + if err != nil { + return nil, err + } return &l0WriteBuffer{ l0Segments: make(map[int64]int64), l0partition: make(map[int64]int64), - writeBufferBase: newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option), + writeBufferBase: base, syncMgr: syncMgr, idAllocator: option.idAllocator, }, nil diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index d654fb40be..deae26eebe 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -25,12 +25,13 @@ import ( type L0WriteBufferSuite struct { suite.Suite - channelName string - collID int64 - collSchema *schemapb.CollectionSchema - syncMgr *syncmgr.MockSyncManager - metacache *metacache.MockMetaCache - allocator *allocator.MockGIDAllocator + channelName string + collID int64 + collSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + allocator *allocator.MockGIDAllocator + storageCache *metacache.StorageV2Cache } func (s *L0WriteBufferSuite) SetupSuite() { @@ -57,6 +58,10 @@ func (s *L0WriteBufferSuite) SetupSuite() { }, } s.channelName = "by-dev-rootcoord-dml_0v0" + + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageCache = storageCache } func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { @@ -146,7 +151,7 @@ func (s *L0WriteBufferSuite) SetupTest() { } func (s *L0WriteBufferSuite) TestBufferData() { - wb, err := NewL0WriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ idAllocator: s.allocator, }) s.NoError(err) @@ -165,6 +170,16 @@ func (s *L0WriteBufferSuite) TestBufferData() { s.NoError(err) } +func (s *L0WriteBufferSuite) TestCreateFailure() { + metacache := metacache.NewMockMetaCache(s.T()) + metacache.EXPECT().Collection().Return(s.collID) + metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) + _, err := NewL0WriteBuffer(s.channelName, metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.Error(err) +} + func TestL0WriteBuffer(t *testing.T) { suite.Run(t, new(L0WriteBufferSuite)) } diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 144878a660..89f05f25e5 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -38,6 +38,8 @@ func (s *ManagerSuite) SetupSuite() { s.collSchema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64, Name: common.RowIDFieldName}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64, Name: common.TimeStampFieldName}, { FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, }, @@ -69,10 +71,13 @@ func (s *ManagerSuite) SetupTest() { func (s *ManagerSuite) TestRegister() { manager := s.manager - err := manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.NoError(err) - err = manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.Error(err) s.ErrorIs(err, merr.ErrChannelReduplicate) } @@ -176,7 +181,9 @@ func (s *ManagerSuite) TestRemoveChannel() { }) s.Run("remove_channel", func() { - err := manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.Require().NoError(err) s.NotPanics(func() { diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index ddd6c2316e..3111e6df01 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v12/arrow" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -13,16 +12,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - milvus_storage "github.com/milvus-io/milvus-storage/go/storage" - "github.com/milvus-io/milvus-storage/go/storage/options" - "github.com/milvus-io/milvus-storage/go/storage/schema" "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -65,9 +60,9 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach switch option.deletePolicy { case DeletePolicyBFPkOracle: - return NewBFWriteBuffer(channel, metacache, nil, syncMgr, option) + return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option) case DeletePolicyL0Delta: - return NewL0WriteBuffer(channel, metacache, nil, syncMgr, option) + return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option) default: return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy) } @@ -85,7 +80,9 @@ type writeBufferBase struct { metaCache metacache.MetaCache syncMgr syncmgr.SyncManager broker broker.Broker - buffers map[int64]*segmentBuffer // segmentID => segmentBuffer + serializer syncmgr.Serializer + + buffers map[int64]*segmentBuffer // segmentID => segmentBuffer syncPolicies []SyncPolicy checkpoint *msgpb.MsgPosition @@ -94,11 +91,29 @@ type writeBufferBase struct { storagev2Cache *metacache.StorageV2Cache } -func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase { +func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) { flushTs := atomic.NewUint64(nonFlushTS) flushTsPolicy := GetFlushTsPolicy(flushTs, metacache) option.syncPolicies = append(option.syncPolicies, flushTsPolicy) + var serializer syncmgr.Serializer + var err error + if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { + serializer, err = syncmgr.NewStorageV2Serializer( + storageV2Cache, + metacache, + option.metaWriter, + ) + } else { + serializer, err = syncmgr.NewStorageSerializer( + metacache, + option.metaWriter, + ) + } + if err != nil { + return nil, err + } + return &writeBufferBase{ channelName: channel, collectionID: metacache.Collection(), @@ -107,10 +122,11 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2 metaWriter: option.metaWriter, buffers: make(map[int64]*segmentBuffer), metaCache: metacache, + serializer: serializer, syncPolicies: option.syncPolicies, flushTimestamp: flushTs, storagev2Cache: storageV2Cache, - } + }, nil } func (wb *writeBufferBase) HasSegment(segmentID int64) bool { @@ -238,8 +254,9 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64 func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) { for _, segmentID := range segmentIDs { - syncTask := wb.getSyncTask(ctx, segmentID) - if syncTask == nil { + syncTask, err := wb.getSyncTask(ctx, segmentID) + if err != nil { + // TODO check err type // segment info not found log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID)) continue @@ -344,49 +361,14 @@ func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKe return nil } -func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) { - return func() (*milvus_storage.Space, error) { - url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s", - params.Params.CommonCfg.StorageScheme.GetValue(), - params.Params.MinioCfg.AccessKeyID.GetValue(), - params.Params.MinioCfg.SecretAccessKey.GetValue(), - params.Params.MinioCfg.BucketName.GetValue(), - segmentID, - params.Params.MinioCfg.Address.GetValue()) - - pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema) - if err != nil { - return nil, err - } - vecSchema, err := typeutil.GetVectorFieldSchema(collSchema) - if err != nil { - return nil, err - } - space, err := milvus_storage.Open( - url, - options.NewSpaceOptionBuilder(). - SetSchema(schema.NewSchema( - arrowSchema, - &schema.SchemaOptions{ - PrimaryColumn: pkSchema.Name, - VectorColumn: vecSchema.Name, - VersionColumn: common.TimeStampFieldName, - }, - )). - Build(), - ) - return space, err - } -} - -func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syncmgr.Task { +func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) { log := log.Ctx(ctx).With( zap.Int64("segmentID", segmentID), ) segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) // wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) if !ok { log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID)) - return nil + return nil, merr.WrapErrSegmentNotFound(segmentID) } var batchSize int64 var totalMemSize float64 @@ -397,7 +379,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax } - actions := []metacache.SegmentAction{metacache.RollStats()} + actions := []metacache.SegmentAction{} if insert != nil { batchSize = int64(insert.GetRowNum()) totalMemSize += float64(insert.GetMemorySize()) @@ -408,69 +390,26 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn actions = append(actions, metacache.StartSyncing(batchSize)) wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID)) - var syncTask syncmgr.Task - if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { - arrowSchema := wb.storagev2Cache.ArrowSchema() - space, err := wb.storagev2Cache.GetOrCreateSpace(segmentID, SpaceCreatorFunc(segmentID, wb.collSchema, arrowSchema)) - if err != nil { - log.Warn("failed to get or create space", zap.Error(err)) - return nil - } + pack := &syncmgr.SyncPack{} + pack.WithInsertData(insert). + WithDeleteData(delta). + WithCollectionID(wb.collectionID). + WithPartitionID(segmentInfo.PartitionID()). + WithChannelName(wb.channelName). + WithSegmentID(segmentID). + WithStartPosition(startPos). + WithTimeRange(tsFrom, tsTo). + WithLevel(segmentInfo.Level()). + WithCheckpoint(wb.checkpoint). + WithBatchSize(batchSize) - task := syncmgr.NewSyncTaskV2(). - WithInsertData(insert). - WithDeleteData(delta). - WithCollectionID(wb.collectionID). - WithPartitionID(segmentInfo.PartitionID()). - WithChannelName(wb.channelName). - WithSegmentID(segmentID). - WithStartPosition(startPos). - WithTimeRange(tsFrom, tsTo). - WithLevel(segmentInfo.Level()). - WithCheckpoint(wb.checkpoint). - WithSchema(wb.collSchema). - WithBatchSize(batchSize). - WithMetaCache(wb.metaCache). - WithMetaWriter(wb.metaWriter). - WithArrowSchema(arrowSchema). - WithSpace(space). - WithFailureCallback(func(err error) { - // TODO could change to unsub channel in the future - panic(err) - }) - if segmentInfo.State() == commonpb.SegmentState_Flushing { - task.WithFlush() - } - syncTask = task - } else { - task := syncmgr.NewSyncTask(). - WithInsertData(insert). - WithDeleteData(delta). - WithCollectionID(wb.collectionID). - WithPartitionID(segmentInfo.PartitionID()). - WithChannelName(wb.channelName). - WithSegmentID(segmentID). - WithStartPosition(startPos). - WithTimeRange(tsFrom, tsTo). - WithLevel(segmentInfo.Level()). - WithCheckpoint(wb.checkpoint). - WithSchema(wb.collSchema). - WithBatchSize(batchSize). - WithMetaCache(wb.metaCache). - WithMetaWriter(wb.metaWriter). - WithFailureCallback(func(err error) { - // TODO could change to unsub channel in the future - panic(err) - }) - if segmentInfo.State() == commonpb.SegmentState_Flushing { - task.WithFlush() - } - syncTask = task + if segmentInfo.State() == commonpb.SegmentState_Flushing { + pack.WithFlush() } metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize) - return syncTask + return wb.serializer.EncodeBuffer(ctx, pack) } func (wb *writeBufferBase) Close(drop bool) { @@ -483,8 +422,9 @@ func (wb *writeBufferBase) Close(drop bool) { var futures []*conc.Future[error] for id := range wb.buffers { - syncTask := wb.getSyncTask(context.Background(), id) - if syncTask == nil { + syncTask, err := wb.getSyncTask(context.Background(), id) + if err != nil { + // TODO continue } switch t := syncTask.(type) { diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 9ee97c8cce..8244eeb36f 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -20,12 +20,13 @@ import ( type WriteBufferSuite struct { suite.Suite - collID int64 - channelName string - collSchema *schemapb.CollectionSchema - wb *writeBufferBase - syncMgr *syncmgr.MockSyncManager - metacache *metacache.MockMetaCache + collID int64 + channelName string + collSchema *schemapb.CollectionSchema + wb *writeBufferBase + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + storageCache *metacache.StorageV2Cache } func (s *WriteBufferSuite) SetupSuite() { @@ -44,20 +45,24 @@ func (s *WriteBufferSuite) SetupSuite() { } func (s *WriteBufferSuite) SetupTest() { + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageCache = storageCache s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() s.metacache.EXPECT().Collection().Return(s.collID).Maybe() - s.wb = newWriteBufferBase(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ + s.wb, err = newWriteBufferBase(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{ pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, }) + s.Require().NoError(err) } func (s *WriteBufferSuite) TestDefaultOption() { s.Run("default BFPkOracle", func() { - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr) s.NoError(err) _, ok := wb.(*bfWriteBuffer) s.True(ok) @@ -66,7 +71,7 @@ func (s *WriteBufferSuite) TestDefaultOption() { s.Run("default L0Delta policy", func() { paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key) - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator())) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator())) s.NoError(err) _, ok := wb.(*l0WriteBuffer) s.True(ok) @@ -74,18 +79,18 @@ func (s *WriteBufferSuite) TestDefaultOption() { } func (s *WriteBufferSuite) TestWriteBufferType() { - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) _, ok := wb.(*bfWriteBuffer) s.True(ok) - wb, err = NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) + wb, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) s.NoError(err) _, ok = wb.(*l0WriteBuffer) s.True(ok) - _, err = NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy("")) + _, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy("")) s.Error(err) } @@ -102,9 +107,9 @@ func (s *WriteBufferSuite) TestHasSegment() { func (s *WriteBufferSuite) TestFlushSegments() { segmentID := int64(1001) - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything) + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) err = wb.FlushSegments(context.Background(), []int64{segmentID})