From 277849a9152b35a2ff27e4e6d55a516308464ca9 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 26 Dec 2023 10:40:47 +0800 Subject: [PATCH] enhance: separate serializer logic from sync task (#29413) See also #27675 Since serialization segment buffer does not related to sync manager can shall be done before submit into sync manager. So that the pk statistic file could be more accurate and reduce complex logic inside sync manager. --------- Signed-off-by: Congqi Xia --- Makefile | 1 + internal/datanode/data_sync_service.go | 2 +- internal/datanode/metacache/actions.go | 5 +- .../datanode/metacache/bloom_filter_set.go | 14 +- .../metacache/bloom_filter_set_test.go | 4 +- internal/datanode/mock_fgmanager.go | 97 ----- internal/datanode/syncmgr/meta_writer.go | 12 +- internal/datanode/syncmgr/meta_writer_test.go | 4 + internal/datanode/syncmgr/mock_meta_writer.go | 158 ++++++++ internal/datanode/syncmgr/options.go | 16 +- internal/datanode/syncmgr/serializer.go | 121 ++++++ .../datanode/syncmgr/storage_serializer.go | 205 ++++++++++ .../syncmgr/storage_serializer_test.go | 324 ++++++++++++++++ .../datanode/syncmgr/storage_v2_serializer.go | 246 ++++++++++++ .../syncmgr/storage_v2_serializer_test.go | 364 ++++++++++++++++++ internal/datanode/syncmgr/task.go | 261 +++++-------- internal/datanode/syncmgr/task_test.go | 111 +++--- internal/datanode/syncmgr/taskv2.go | 215 +---------- internal/datanode/syncmgr/taskv2_test.go | 53 ++- .../datanode/writebuffer/bf_write_buffer.go | 6 +- .../writebuffer/bf_write_buffer_test.go | 17 +- .../datanode/writebuffer/l0_write_buffer.go | 6 +- .../writebuffer/l0_write_buffer_test.go | 29 +- internal/datanode/writebuffer/manager_test.go | 13 +- internal/datanode/writebuffer/write_buffer.go | 160 +++----- .../datanode/writebuffer/write_buffer_test.go | 33 +- 26 files changed, 1759 insertions(+), 718 deletions(-) create mode 100644 internal/datanode/syncmgr/mock_meta_writer.go create mode 100644 internal/datanode/syncmgr/serializer.go create mode 100644 internal/datanode/syncmgr/storage_serializer.go create mode 100644 internal/datanode/syncmgr/storage_serializer_test.go create mode 100644 internal/datanode/syncmgr/storage_v2_serializer.go create mode 100644 internal/datanode/syncmgr/storage_v2_serializer_test.go diff --git a/Makefile b/Makefile index 52d984e2be..ef2bc4eb0f 100644 --- a/Makefile +++ b/Makefile @@ -446,6 +446,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage $(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage + $(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 4f7748284d..590d554011 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -196,7 +196,7 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C } func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.SegmentInfo, schema *schemapb.CollectionSchema) ([]*storage.PkStatistics, error) { - space, err := storageCache.GetOrCreateSpace(segment.ID, writebuffer.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) + space, err := storageCache.GetOrCreateSpace(segment.ID, syncmgr.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) if err != nil { return nil, err } diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 81bc141abd..214f9ed433 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -102,9 +103,9 @@ func UpdateBufferedRows(bufferedRows int64) SegmentAction { } } -func RollStats() SegmentAction { +func RollStats(newStats ...*storage.PrimaryKeyStats) SegmentAction { return func(info *SegmentInfo) { - info.bfs.Roll() + info.bfs.Roll(newStats...) } } diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index f074a8ece7..7e9bbcb7b0 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" ) @@ -64,13 +65,18 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { return bfs.current.UpdatePKRange(ids) } -func (bfs *BloomFilterSet) Roll() { +func (bfs *BloomFilterSet) Roll(newStats ...*storage.PrimaryKeyStats) { bfs.mut.Lock() defer bfs.mut.Unlock() - if bfs.current != nil { - bfs.history = append(bfs.history, bfs.current) - bfs.current = nil + if len(newStats) > 0 { + bfs.history = append(bfs.history, lo.Map(newStats, func(stats *storage.PrimaryKeyStats, _ int) *storage.PkStatistics { + return &storage.PkStatistics{ + PkFilter: stats.BF, + MaxPK: stats.MaxPk, + MinPK: stats.MinPk, + } + })...) } } diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index d4dc6668b1..4d647b6910 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -78,7 +78,9 @@ func (s *BloomFilterSetSuite) TestRoll() { err := s.bfs.UpdatePKRange(s.GetFieldData(ids)) s.NoError(err) - s.bfs.Roll() + newEntry := &storage.PrimaryKeyStats{} + + s.bfs.Roll(newEntry) history = s.bfs.GetHistory() s.Equal(1, len(history), "history shall have one entry after roll with current data") diff --git a/internal/datanode/mock_fgmanager.go b/internal/datanode/mock_fgmanager.go index 1dea01e67f..73ed8f50a3 100644 --- a/internal/datanode/mock_fgmanager.go +++ b/internal/datanode/mock_fgmanager.go @@ -388,103 +388,6 @@ func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) RunAndReturn(run func(strin return _c } -// Start provides a mock function with given fields: -func (_m *MockFlowgraphManager) Start() { - _m.Called() -} - -// MockFlowgraphManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' -type MockFlowgraphManager_Start_Call struct { - *mock.Call -} - -// Start is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) Start() *MockFlowgraphManager_Start_Call { - return &MockFlowgraphManager_Start_Call{Call: _e.mock.On("Start")} -} - -func (_c *MockFlowgraphManager_Start_Call) Run(run func()) *MockFlowgraphManager_Start_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockFlowgraphManager_Start_Call) Return() *MockFlowgraphManager_Start_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_Start_Call) RunAndReturn(run func()) *MockFlowgraphManager_Start_Call { - _c.Call.Return(run) - return _c -} - -// Stop provides a mock function with given fields: -func (_m *MockFlowgraphManager) Stop() { - _m.Called() -} - -// MockFlowgraphManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' -type MockFlowgraphManager_Stop_Call struct { - *mock.Call -} - -// Stop is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) Stop() *MockFlowgraphManager_Stop_Call { - return &MockFlowgraphManager_Stop_Call{Call: _e.mock.On("Stop")} -} - -func (_c *MockFlowgraphManager_Stop_Call) Run(run func()) *MockFlowgraphManager_Stop_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockFlowgraphManager_Stop_Call) Return() *MockFlowgraphManager_Stop_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_Stop_Call) RunAndReturn(run func()) *MockFlowgraphManager_Stop_Call { - _c.Call.Return(run) - return _c -} - -// controlMemWaterLevel provides a mock function with given fields: totalMemory -func (_m *MockFlowgraphManager) controlMemWaterLevel(totalMemory uint64) { - _m.Called(totalMemory) -} - -// MockFlowgraphManager_controlMemWaterLevel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'controlMemWaterLevel' -type MockFlowgraphManager_controlMemWaterLevel_Call struct { - *mock.Call -} - -// controlMemWaterLevel is a helper method to define mock.On call -// - totalMemory uint64 -func (_e *MockFlowgraphManager_Expecter) controlMemWaterLevel(totalMemory interface{}) *MockFlowgraphManager_controlMemWaterLevel_Call { - return &MockFlowgraphManager_controlMemWaterLevel_Call{Call: _e.mock.On("controlMemWaterLevel", totalMemory)} -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Run(run func(totalMemory uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(uint64)) - }) - return _c -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Return() *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Return() - return _c -} - -func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) RunAndReturn(run func(uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { - _c.Call.Return(run) - return _c -} - // NewMockFlowgraphManager creates a new instance of MockFlowgraphManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockFlowgraphManager(t interface { diff --git a/internal/datanode/syncmgr/meta_writer.go b/internal/datanode/syncmgr/meta_writer.go index 0f7c3f8617..4d506be2e5 100644 --- a/internal/datanode/syncmgr/meta_writer.go +++ b/internal/datanode/syncmgr/meta_writer.go @@ -48,12 +48,11 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog) } - // only current segment checkpoint info, - segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID)) - if len(segments) == 0 { + // only current segment checkpoint info + segment, ok := pack.metacache.GetSegmentByID(pack.segmentID) + if !ok { return merr.WrapErrSegmentNotFound(pack.segmentID) } - segment := segments[0] checkPoints = append(checkPoints, &datapb.CheckPoint{ SegmentID: pack.segmentID, NumOfRows: segment.FlushedRows() + pack.batchSize, @@ -140,11 +139,10 @@ func (b *brokerMetaWriter) UpdateSyncV2(pack *SyncTaskV2) error { checkPoints := []*datapb.CheckPoint{} // only current segment checkpoint info, - segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID)) - if len(segments) == 0 { + segment, ok := pack.metacache.GetSegmentByID(pack.segmentID) + if !ok { return merr.WrapErrSegmentNotFound(pack.segmentID) } - segment := segments[0] checkPoints = append(checkPoints, &datapb.CheckPoint{ SegmentID: pack.segmentID, NumOfRows: segment.FlushedRows() + pack.batchSize, diff --git a/internal/datanode/syncmgr/meta_writer_test.go b/internal/datanode/syncmgr/meta_writer_test.go index 8742e8c6b9..23d54d9be4 100644 --- a/internal/datanode/syncmgr/meta_writer_test.go +++ b/internal/datanode/syncmgr/meta_writer_test.go @@ -40,6 +40,7 @@ func (s *MetaWriterSuite) TestNormalSave() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() task := NewSyncTask() task.WithMetaCache(s.metacache) @@ -53,6 +54,7 @@ func (s *MetaWriterSuite) TestReturnError() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTask() task.WithMetaCache(s.metacache) @@ -66,6 +68,7 @@ func (s *MetaWriterSuite) TestNormalSaveV2() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTaskV2() task.WithMetaCache(s.metacache) @@ -79,6 +82,7 @@ func (s *MetaWriterSuite) TestReturnErrorV2() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTaskV2() task.WithMetaCache(s.metacache) diff --git a/internal/datanode/syncmgr/mock_meta_writer.go b/internal/datanode/syncmgr/mock_meta_writer.go new file mode 100644 index 0000000000..17cd5f25d9 --- /dev/null +++ b/internal/datanode/syncmgr/mock_meta_writer.go @@ -0,0 +1,158 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package syncmgr + +import mock "github.com/stretchr/testify/mock" + +// MockMetaWriter is an autogenerated mock type for the MetaWriter type +type MockMetaWriter struct { + mock.Mock +} + +type MockMetaWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter { + return &MockMetaWriter_Expecter{mock: &_m.Mock} +} + +// DropChannel provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) DropChannel(_a0 string) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_DropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannel' +type MockMetaWriter_DropChannel_Call struct { + *mock.Call +} + +// DropChannel is a helper method to define mock.On call +// - _a0 string +func (_e *MockMetaWriter_Expecter) DropChannel(_a0 interface{}) *MockMetaWriter_DropChannel_Call { + return &MockMetaWriter_DropChannel_Call{Call: _e.mock.On("DropChannel", _a0)} +} + +func (_c *MockMetaWriter_DropChannel_Call) Run(run func(_a0 string)) *MockMetaWriter_DropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockMetaWriter_DropChannel_Call) Return(_a0 error) *MockMetaWriter_DropChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(string) error) *MockMetaWriter_DropChannel_Call { + _c.Call.Return(run) + return _c +} + +// UpdateSync provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) UpdateSync(_a0 *SyncTask) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*SyncTask) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_UpdateSync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSync' +type MockMetaWriter_UpdateSync_Call struct { + *mock.Call +} + +// UpdateSync is a helper method to define mock.On call +// - _a0 *SyncTask +func (_e *MockMetaWriter_Expecter) UpdateSync(_a0 interface{}) *MockMetaWriter_UpdateSync_Call { + return &MockMetaWriter_UpdateSync_Call{Call: _e.mock.On("UpdateSync", _a0)} +} + +func (_c *MockMetaWriter_UpdateSync_Call) Run(run func(_a0 *SyncTask)) *MockMetaWriter_UpdateSync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*SyncTask)) + }) + return _c +} + +func (_c *MockMetaWriter_UpdateSync_Call) Return(_a0 error) *MockMetaWriter_UpdateSync_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_UpdateSync_Call) RunAndReturn(run func(*SyncTask) error) *MockMetaWriter_UpdateSync_Call { + _c.Call.Return(run) + return _c +} + +// UpdateSyncV2 provides a mock function with given fields: _a0 +func (_m *MockMetaWriter) UpdateSyncV2(_a0 *SyncTaskV2) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*SyncTaskV2) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockMetaWriter_UpdateSyncV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSyncV2' +type MockMetaWriter_UpdateSyncV2_Call struct { + *mock.Call +} + +// UpdateSyncV2 is a helper method to define mock.On call +// - _a0 *SyncTaskV2 +func (_e *MockMetaWriter_Expecter) UpdateSyncV2(_a0 interface{}) *MockMetaWriter_UpdateSyncV2_Call { + return &MockMetaWriter_UpdateSyncV2_Call{Call: _e.mock.On("UpdateSyncV2", _a0)} +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) Run(run func(_a0 *SyncTaskV2)) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*SyncTaskV2)) + }) + return _c +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) Return(_a0 error) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaWriter_UpdateSyncV2_Call) RunAndReturn(run func(*SyncTaskV2) error) *MockMetaWriter_UpdateSyncV2_Call { + _c.Call.Return(run) + return _c +} + +// NewMockMetaWriter creates a new instance of MockMetaWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMetaWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMetaWriter { + mock := &MockMetaWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/syncmgr/options.go b/internal/datanode/syncmgr/options.go index 324cdb1700..39da2da647 100644 --- a/internal/datanode/syncmgr/options.go +++ b/internal/datanode/syncmgr/options.go @@ -1,6 +1,8 @@ package syncmgr import ( + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" @@ -18,6 +20,7 @@ func NewSyncTask() *SyncTask { statsBinlogs: make(map[int64]*datapb.FieldBinlog), deltaBinlog: &datapb.FieldBinlog{}, segmentData: make(map[string][]byte), + binlogBlobs: make(map[int64]*storage.Blob), } } @@ -31,16 +34,6 @@ func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask { return t } -func (t *SyncTask) WithInsertData(insertData *storage.InsertData) *SyncTask { - t.insertData = insertData - return t -} - -func (t *SyncTask) WithDeleteData(deleteData *storage.DeleteData) *SyncTask { - t.deleteData = deleteData - return t -} - func (t *SyncTask) WithStartPosition(start *msgpb.MsgPosition) *SyncTask { t.startPosition = start return t @@ -73,6 +66,9 @@ func (t *SyncTask) WithChannelName(chanName string) *SyncTask { func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask { t.schema = schema + t.pkField = lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { + return field.GetIsPrimaryKey() + }) return t } diff --git a/internal/datanode/syncmgr/serializer.go b/internal/datanode/syncmgr/serializer.go new file mode 100644 index 0000000000..60217277d6 --- /dev/null +++ b/internal/datanode/syncmgr/serializer.go @@ -0,0 +1,121 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// Serializer is the interface for storage/storageV2 implementation to encoding +// WriteBuffer into sync task. +type Serializer interface { + EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) +} + +// SyncPack is the struct contains buffer sync data. +type SyncPack struct { + metacache metacache.MetaCache + metawriter MetaWriter + // data + insertData *storage.InsertData + deltaData *storage.DeleteData + // statistics + tsFrom typeutil.Timestamp + tsTo typeutil.Timestamp + startPosition *msgpb.MsgPosition + checkpoint *msgpb.MsgPosition + batchSize int64 // batchSize is the row number of this sync task,not the total num of rows of segemnt + isFlush bool + isDrop bool + // metadata + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + level datapb.SegmentLevel +} + +func (p *SyncPack) WithInsertData(insertData *storage.InsertData) *SyncPack { + p.insertData = insertData + return p +} + +func (p *SyncPack) WithDeleteData(deltaData *storage.DeleteData) *SyncPack { + p.deltaData = deltaData + return p +} + +func (p *SyncPack) WithStartPosition(start *msgpb.MsgPosition) *SyncPack { + p.startPosition = start + return p +} + +func (p *SyncPack) WithCheckpoint(cp *msgpb.MsgPosition) *SyncPack { + p.checkpoint = cp + return p +} + +func (p *SyncPack) WithCollectionID(collID int64) *SyncPack { + p.collectionID = collID + return p +} + +func (p *SyncPack) WithPartitionID(partID int64) *SyncPack { + p.partitionID = partID + return p +} + +func (p *SyncPack) WithSegmentID(segID int64) *SyncPack { + p.segmentID = segID + return p +} + +func (p *SyncPack) WithChannelName(chanName string) *SyncPack { + p.channelName = chanName + return p +} + +func (p *SyncPack) WithTimeRange(from, to typeutil.Timestamp) *SyncPack { + p.tsFrom, p.tsTo = from, to + return p +} + +func (p *SyncPack) WithFlush() *SyncPack { + p.isFlush = true + return p +} + +func (p *SyncPack) WithDrop() *SyncPack { + p.isDrop = true + return p +} + +func (p *SyncPack) WithBatchSize(batchSize int64) *SyncPack { + p.batchSize = batchSize + return p +} + +func (p *SyncPack) WithLevel(level datapb.SegmentLevel) *SyncPack { + p.level = level + return p +} diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go new file mode 100644 index 0000000000..2213d572a1 --- /dev/null +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -0,0 +1,205 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "strconv" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type storageV1Serializer struct { + collectionID int64 + schema *schemapb.CollectionSchema + pkField *schemapb.FieldSchema + + inCodec *storage.InsertCodec + delCodec *storage.DeleteCodec + + metacache metacache.MetaCache + metaWriter MetaWriter +} + +func NewStorageSerializer(metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) { + collectionID := metacache.Collection() + schema := metacache.Schema() + pkField := lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) + if pkField == nil { + return nil, merr.WrapErrServiceInternal("cannot find pk field") + } + meta := &etcdpb.CollectionMeta{ + Schema: schema, + ID: collectionID, + } + inCodec := storage.NewInsertCodecWithSchema(meta) + return &storageV1Serializer{ + collectionID: collectionID, + schema: schema, + pkField: pkField, + + inCodec: inCodec, + delCodec: storage.NewDeleteCodec(), + metacache: metacache, + metaWriter: metaWriter, + }, nil +} + +func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { + task := NewSyncTask() + + log := log.Ctx(ctx).With( + zap.Int64("segmentID", pack.segmentID), + zap.Int64("collectionID", pack.collectionID), + zap.String("channel", pack.channelName), + ) + + if pack.insertData != nil { + memSize := make(map[int64]int64) + for fieldID, fieldData := range pack.insertData.Data { + memSize[fieldID] = int64(fieldData.GetMemorySize()) + } + task.binlogMemsize = memSize + + binlogBlobs, err := s.serializeBinlog(ctx, pack) + if err != nil { + log.Warn("failed to serialize binlog", zap.Error(err)) + return nil, err + } + task.binlogBlobs = binlogBlobs + + singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack) + if err != nil { + log.Warn("failed to serialized statslog", zap.Error(err)) + return nil, err + } + + task.batchStatsBlob = batchStatsBlob + s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID)) + } + + if pack.isFlush { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } + + task.mergedStatsBlob = mergedStatsBlob + task.WithFlush() + } + + if pack.deltaData != nil { + deltaBlob, err := s.serializeDeltalog(pack) + if err != nil { + log.Warn("failed to serialize delta log", zap.Error(err)) + return nil, err + } + task.deltaBlob = deltaBlob + task.deltaRowCount = pack.deltaData.RowCount + } + if pack.isDrop { + task.WithDrop() + } + + s.setTaskMeta(task, pack) + return task, nil +} + +func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { + task.WithCollectionID(pack.collectionID). + WithPartitionID(pack.partitionID). + WithChannelName(pack.channelName). + WithSegmentID(pack.segmentID). + WithBatchSize(pack.batchSize). + WithSchema(s.metacache.Schema()). + WithStartPosition(pack.startPosition). + WithCheckpoint(pack.checkpoint). + WithLevel(pack.level). + WithTimeRange(pack.tsFrom, pack.tsTo). + WithMetaCache(s.metacache). + WithMetaWriter(s.metaWriter). + WithFailureCallback(func(err error) { + // TODO could change to unsub channel in the future + panic(err) + }) +} + +func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) { + blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData) + if err != nil { + return nil, err + } + + result := make(map[int64]*storage.Blob) + for _, blob := range blobs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + if err != nil { + log.Error("serialize buffer failed ... cannot parse string to fieldID ..", zap.Error(err)) + return nil, err + } + + result[fieldID] = blob + } + return result, nil +} + +func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) { + pkFieldData := pack.insertData.Data[s.pkField.GetFieldID()] + rowNum := int64(pkFieldData.RowNum()) + + stats, err := storage.NewPrimaryKeyStats(s.pkField.GetFieldID(), int64(s.pkField.GetDataType()), rowNum) + if err != nil { + return nil, nil, err + } + stats.UpdateByMsgs(pkFieldData) + + blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize) + if err != nil { + return nil, nil, err + } + return stats, blob, nil +} + +func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.Blob, error) { + segment, ok := s.metacache.GetSegmentByID(pack.segmentID) + if !ok { + return nil, merr.WrapErrSegmentNotFound(pack.segmentID) + } + + return s.inCodec.SerializePkStatsList(lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { + return &storage.PrimaryKeyStats{ + FieldID: s.pkField.GetFieldID(), + MaxPk: pks.MaxPK, + MinPk: pks.MinPK, + BF: pks.PkFilter, + PkType: int64(s.pkField.GetDataType()), + } + }), segment.NumOfRows()) +} + +func (s *storageV1Serializer) serializeDeltalog(pack *SyncPack) (*storage.Blob, error) { + return s.delCodec.Serialize(pack.collectionID, pack.partitionID, pack.segmentID, pack.deltaData) +} diff --git a/internal/datanode/syncmgr/storage_serializer_test.go b/internal/datanode/syncmgr/storage_serializer_test.go new file mode 100644 index 0000000000..43894b47b1 --- /dev/null +++ b/internal/datanode/syncmgr/storage_serializer_test.go @@ -0,0 +1,324 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type StorageV1SerializerSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + + schema *schemapb.CollectionSchema + + mockCache *metacache.MockMetaCache + mockMetaWriter *MockMetaWriter + + serializer *storageV1Serializer +} + +func (s *StorageV1SerializerSuite) SetupSuite() { + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID) + s.schema = &schemapb.CollectionSchema{ + Name: "serializer_v1_test_col", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64}, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + s.mockCache = metacache.NewMockMetaCache(s.T()) + s.mockMetaWriter = NewMockMetaWriter(s.T()) +} + +func (s *StorageV1SerializerSuite) SetupTest() { + s.mockCache.EXPECT().Collection().Return(s.collectionID) + s.mockCache.EXPECT().Schema().Return(s.schema) + + var err error + s.serializer, err = NewStorageSerializer(s.mockCache, s.mockMetaWriter) + s.Require().NoError(err) +} + +func (s *StorageV1SerializerSuite) getEmptyInsertBuffer() *storage.InsertData { + buf, err := storage.NewInsertData(s.schema) + s.Require().NoError(err) + + return buf +} + +func (s *StorageV1SerializerSuite) getInsertBuffer() *storage.InsertData { + buf := s.getEmptyInsertBuffer() + + // generate data + for i := 0; i < 10; i++ { + data := make(map[storage.FieldID]any) + data[common.RowIDField] = int64(i + 1) + data[common.TimeStampField] = int64(i + 1) + data[100] = int64(i + 1) + vector := lo.RepeatBy(128, func(_ int) float32 { + return rand.Float32() + }) + data[101] = vector + err := buf.Append(data) + s.Require().NoError(err) + } + return buf +} + +func (s *StorageV1SerializerSuite) getDeleteBuffer() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + ts := tsoutil.ComposeTSByTime(time.Now(), 0) + buf.Append(pk, ts) + } + return buf +} + +func (s *StorageV1SerializerSuite) getDeleteBufferZeroTs() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + buf.Append(pk, 0) + } + return buf +} + +func (s *StorageV1SerializerSuite) getBasicPack() *SyncPack { + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName). + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + + return pack +} + +func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet { + bfs := metacache.NewBloomFilterSet() + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + ids := []int64{1, 2, 3, 4, 5, 6, 7} + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + + bfs.UpdatePKRange(fd) + return bfs +} + +func (s *StorageV1SerializerSuite) TestSerializeInsert() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("without_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithDrop() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.True(taskV1.isDrop) + }) + + s.Run("with_empty_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_normal_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.Len(taskV1.binlogBlobs, 4) + s.NotNil(taskV1.batchStatsBlob) + }) + + s.Run("with_flush_segment_not_found", func() { + pack := s.getBasicPack() + pack.WithFlush() + + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_flush", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithFlush() + + bfs := s.getBfs() + segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(segInfo) + metacache.CompactTo(metacache.NullSegment)(segInfo) + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(segInfo) + }).Return().Once() + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.Len(taskV1.binlogBlobs, 4) + s.NotNil(taskV1.batchStatsBlob) + s.NotNil(taskV1.mergedStatsBlob) + }) +} + +func (s *StorageV1SerializerSuite) TestSerializeDelete() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("serialize_failed", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_normal", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBuffer()) + pack.WithTimeRange(50, 100) + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV1, ok := task.(*SyncTask) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.NotNil(taskV1.deltaBlob) + }) +} + +func (s *StorageV1SerializerSuite) TestBadSchema() { + mockCache := metacache.NewMockMetaCache(s.T()) + mockCache.EXPECT().Collection().Return(s.collectionID).Once() + mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once() + _, err := NewStorageSerializer(mockCache, s.mockMetaWriter) + s.Error(err) +} + +func TestStorageV1Serializer(t *testing.T) { + suite.Run(t, new(StorageV1SerializerSuite)) +} diff --git a/internal/datanode/syncmgr/storage_v2_serializer.go b/internal/datanode/syncmgr/storage_v2_serializer.go new file mode 100644 index 0000000000..7c11936fd7 --- /dev/null +++ b/internal/datanode/syncmgr/storage_v2_serializer.go @@ -0,0 +1,246 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus-storage/go/storage/schema" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/storage" + iTypeutil "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type storageV2Serializer struct { + *storageV1Serializer + + arrowSchema *arrow.Schema + storageV2Cache *metacache.StorageV2Cache + inCodec *storage.InsertCodec + metacache metacache.MetaCache +} + +func NewStorageV2Serializer( + storageV2Cache *metacache.StorageV2Cache, + metacache metacache.MetaCache, + metaWriter MetaWriter, +) (*storageV2Serializer, error) { + v1Serializer, err := NewStorageSerializer(metacache, metaWriter) + if err != nil { + return nil, err + } + + return &storageV2Serializer{ + storageV1Serializer: v1Serializer, + storageV2Cache: storageV2Cache, + arrowSchema: storageV2Cache.ArrowSchema(), + metacache: metacache, + }, nil +} + +func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { + task := NewSyncTaskV2() + + space, err := s.storageV2Cache.GetOrCreateSpace(pack.segmentID, SpaceCreatorFunc(pack.segmentID, s.schema, s.arrowSchema)) + if err != nil { + log.Warn("failed to get or create space", zap.Error(err)) + return nil, err + } + + task.space = space + if pack.insertData != nil { + insertReader, err := s.serializeInsertData(pack) + if err != nil { + log.Warn("failed to serialize insert data with storagev2", zap.Error(err)) + return nil, err + } + + task.reader = insertReader + + singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack) + if err != nil { + log.Warn("failed to serialized statslog", zap.Error(err)) + return nil, err + } + + task.batchStatsBlob = batchStatsBlob + s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID)) + } + + if pack.isFlush { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } + + task.mergedStatsBlob = mergedStatsBlob + task.WithFlush() + } + + if pack.deltaData != nil { + deltaReader, err := s.serializeDeltaData(pack) + if err != nil { + log.Warn("failed to serialize delta data", zap.Error(err)) + return nil, err + } + task.deleteReader = deltaReader + } + + if pack.isDrop { + task.WithDrop() + } + + s.setTaskMeta(task, pack) + return task, nil +} + +func (s *storageV2Serializer) setTaskMeta(task *SyncTaskV2, pack *SyncPack) { + task.WithCollectionID(pack.collectionID). + WithPartitionID(pack.partitionID). + WithChannelName(pack.channelName). + WithSegmentID(pack.segmentID). + WithBatchSize(pack.batchSize). + WithSchema(s.metacache.Schema()). + WithStartPosition(pack.startPosition). + WithCheckpoint(pack.checkpoint). + WithLevel(pack.level). + WithTimeRange(pack.tsFrom, pack.tsTo). + WithMetaCache(s.metacache). + WithMetaWriter(s.metaWriter). + WithFailureCallback(func(err error) { + // TODO could change to unsub channel in the future + panic(err) + }) +} + +func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordReader, error) { + builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema) + defer builder.Release() + + if err := buildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil { + return nil, err + } + + rec := builder.NewRecord() + defer rec.Release() + + itr, err := array.NewRecordReader(s.arrowSchema, []arrow.Record{rec}) + if err != nil { + return nil, err + } + itr.Retain() + + return itr, nil +} + +func (s *storageV2Serializer) serializeDeltaData(pack *SyncPack) (array.RecordReader, error) { + fields := make([]*schemapb.FieldSchema, 0, 2) + tsField := &schemapb.FieldSchema{ + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + } + fields = append(fields, s.pkField, tsField) + + deltaArrowSchema, err := iTypeutil.ConvertToArrowSchema(fields) + if err != nil { + return nil, err + } + + builder := array.NewRecordBuilder(memory.DefaultAllocator, deltaArrowSchema) + defer builder.Release() + + switch s.pkField.GetDataType() { + case schemapb.DataType_Int64: + pb := builder.Field(0).(*array.Int64Builder) + for _, pk := range pack.deltaData.Pks { + pb.Append(pk.GetValue().(int64)) + } + case schemapb.DataType_VarChar: + pb := builder.Field(0).(*array.StringBuilder) + for _, pk := range pack.deltaData.Pks { + pb.Append(pk.GetValue().(string)) + } + default: + return nil, merr.WrapErrParameterInvalidMsg("unexpected pk type %v", s.pkField.GetDataType()) + } + + for _, ts := range pack.deltaData.Tss { + builder.Field(1).(*array.Int64Builder).Append(int64(ts)) + } + + rec := builder.NewRecord() + defer rec.Release() + + reader, err := array.NewRecordReader(deltaArrowSchema, []arrow.Record{rec}) + if err != nil { + return nil, err + } + reader.Retain() + + return reader, nil +} + +func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) { + return func() (*milvus_storage.Space, error) { + url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s", + params.Params.CommonCfg.StorageScheme.GetValue(), + params.Params.MinioCfg.AccessKeyID.GetValue(), + params.Params.MinioCfg.SecretAccessKey.GetValue(), + params.Params.MinioCfg.BucketName.GetValue(), + segmentID, + params.Params.MinioCfg.Address.GetValue()) + + pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema) + if err != nil { + return nil, err + } + vecSchema, err := typeutil.GetVectorFieldSchema(collSchema) + if err != nil { + return nil, err + } + space, err := milvus_storage.Open( + url, + options.NewSpaceOptionBuilder(). + SetSchema(schema.NewSchema( + arrowSchema, + &schema.SchemaOptions{ + PrimaryColumn: pkSchema.Name, + VectorColumn: vecSchema.Name, + VersionColumn: common.TimeStampFieldName, + }, + )). + Build(), + ) + return space, err + } +} diff --git a/internal/datanode/syncmgr/storage_v2_serializer_test.go b/internal/datanode/syncmgr/storage_v2_serializer_test.go new file mode 100644 index 0000000000..9a4711a7d6 --- /dev/null +++ b/internal/datanode/syncmgr/storage_v2_serializer_test.go @@ -0,0 +1,364 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus-storage/go/storage/schema" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type StorageV2SerializerSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + segmentID int64 + channelName string + + schema *schemapb.CollectionSchema + storageCache *metacache.StorageV2Cache + mockCache *metacache.MockMetaCache + mockMetaWriter *MockMetaWriter + + serializer *storageV2Serializer +} + +func (s *StorageV2SerializerSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml0_%d_v1", s.collectionID) + s.schema = &schemapb.CollectionSchema{ + Name: "sync_task_test_col", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64, Name: common.RowIDFieldName}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64, Name: common.TimeStampFieldName}, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + s.mockCache = metacache.NewMockMetaCache(s.T()) + s.mockMetaWriter = NewMockMetaWriter(s.T()) +} + +func (s *StorageV2SerializerSuite) SetupTest() { + storageCache, err := metacache.NewStorageV2Cache(s.schema) + s.Require().NoError(err) + s.storageCache = storageCache + + s.mockCache.EXPECT().Collection().Return(s.collectionID) + s.mockCache.EXPECT().Schema().Return(s.schema) + + s.serializer, err = NewStorageV2Serializer(storageCache, s.mockCache, s.mockMetaWriter) + s.Require().NoError(err) +} + +func (s *StorageV2SerializerSuite) getSpace() *milvus_storage.Space { + tmpDir := s.T().TempDir() + space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder(). + SetSchema(schema.NewSchema(s.storageCache.ArrowSchema(), &schema.SchemaOptions{ + PrimaryColumn: "pk", VectorColumn: "vector", VersionColumn: common.TimeStampFieldName, + })).Build()) + s.Require().NoError(err) + return space +} + +func (s *StorageV2SerializerSuite) getBasicPack() *SyncPack { + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName). + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + + return pack +} + +func (s *StorageV2SerializerSuite) getEmptyInsertBuffer() *storage.InsertData { + buf, err := storage.NewInsertData(s.schema) + s.Require().NoError(err) + + return buf +} + +func (s *StorageV2SerializerSuite) getInsertBuffer() *storage.InsertData { + buf := s.getEmptyInsertBuffer() + + // generate data + for i := 0; i < 10; i++ { + data := make(map[storage.FieldID]any) + data[common.RowIDField] = int64(i + 1) + data[common.TimeStampField] = int64(i + 1) + data[100] = int64(i + 1) + vector := lo.RepeatBy(128, func(_ int) float32 { + return rand.Float32() + }) + data[101] = vector + err := buf.Append(data) + s.Require().NoError(err) + } + return buf +} + +func (s *StorageV2SerializerSuite) getDeleteBuffer() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + ts := tsoutil.ComposeTSByTime(time.Now(), 0) + buf.Append(pk, ts) + } + return buf +} + +func (s *StorageV2SerializerSuite) getDeleteBufferZeroTs() *storage.DeleteData { + buf := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + buf.Append(pk, 0) + } + return buf +} + +func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet { + bfs := metacache.NewBloomFilterSet() + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + ids := []int64{1, 2, 3, 4, 5, 6, 7} + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + + bfs.UpdatePKRange(fd) + return bfs +} + +func (s *StorageV2SerializerSuite) TestSerializeInsert() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.storageCache.SetSpace(s.segmentID, s.getSpace()) + + s.Run("no_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithDrop() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + taskV1, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV1.collectionID) + s.Equal(s.partitionID, taskV1.partitionID) + s.Equal(s.channelName, taskV1.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV1.checkpoint) + s.EqualValues(50, taskV1.tsFrom) + s.EqualValues(100, taskV1.tsTo) + s.True(taskV1.isDrop) + }) + + s.Run("empty_insert_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_normal_data", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.reader) + s.NotNil(taskV2.batchStatsBlob) + }) + + s.Run("with_flush_segment_not_found", func() { + pack := s.getBasicPack() + pack.WithFlush() + + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("with_flush", func() { + pack := s.getBasicPack() + pack.WithTimeRange(50, 100) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithFlush() + + bfs := s.getBfs() + segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(segInfo) + metacache.CompactTo(metacache.NullSegment)(segInfo) + s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(segInfo) + }).Return().Once() + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once() + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.mergedStatsBlob) + }) +} + +func (s *StorageV2SerializerSuite) TestSerializeDelete() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("serialize_failed", func() { + pkField := s.serializer.pkField + s.serializer.pkField = &schemapb.FieldSchema{} + defer func() { + s.serializer.pkField = pkField + }() + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_failed_bad_pk", func() { + pkField := s.serializer.pkField + s.serializer.pkField = &schemapb.FieldSchema{ + DataType: schemapb.DataType_Array, + } + defer func() { + s.serializer.pkField = pkField + }() + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBufferZeroTs()) + pack.WithTimeRange(50, 100) + + _, err := s.serializer.EncodeBuffer(ctx, pack) + s.Error(err) + }) + + s.Run("serialize_normal", func() { + pack := s.getBasicPack() + pack.WithDeleteData(s.getDeleteBuffer()) + pack.WithTimeRange(50, 100) + + task, err := s.serializer.EncodeBuffer(ctx, pack) + s.NoError(err) + + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + s.Equal(s.collectionID, taskV2.collectionID) + s.Equal(s.partitionID, taskV2.partitionID) + s.Equal(s.channelName, taskV2.channelName) + s.Equal(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }, taskV2.checkpoint) + s.EqualValues(50, taskV2.tsFrom) + s.EqualValues(100, taskV2.tsTo) + s.NotNil(taskV2.deleteReader) + }) +} + +func (s *StorageV2SerializerSuite) TestBadSchema() { + mockCache := metacache.NewMockMetaCache(s.T()) + mockCache.EXPECT().Collection().Return(s.collectionID).Once() + mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once() + _, err := NewStorageV2Serializer(s.storageCache, mockCache, s.mockMetaWriter) + s.Error(err) +} + +func TestStorageV2Serializer(t *testing.T) { + suite.Run(t, new(StorageV2SerializerSuite)) +} diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index 6d2d83a336..ac3d8db2c4 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -1,12 +1,26 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package syncmgr import ( "context" "fmt" "path" - "strconv" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -15,7 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -32,15 +45,13 @@ type SyncTask struct { chunkManager storage.ChunkManager allocator allocator.Interface - insertData *storage.InsertData - deleteData *storage.DeleteData - segment *metacache.SegmentInfo collectionID int64 partitionID int64 segmentID int64 channelName string schema *schemapb.CollectionSchema + pkField *schemapb.FieldSchema startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition // batchSize is the row number of this sync task, @@ -61,6 +72,13 @@ type SyncTask struct { statsBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog deltaBinlog *datapb.FieldBinlog + binlogBlobs map[int64]*storage.Blob // fieldID => blob + binlogMemsize map[int64]int64 // memory size + batchStatsBlob *storage.Blob + mergedStatsBlob *storage.Blob + deltaBlob *storage.Blob + deltaRowCount int64 + segmentData map[string][]byte writeRetryOpts []retry.Option @@ -90,14 +108,18 @@ func (t *SyncTask) handleError(err error, metricSegLevel string) { } } -func (t *SyncTask) Run() error { +func (t *SyncTask) Run() (err error) { t.tr = timerecord.NewTimeRecorder("syncTask") - var metricSegLevel string = t.level.String() + metricSegLevel := t.level.String() log := t.getLogger() - var err error - var has bool + defer func() { + if err != nil { + t.handleError(err, metricSegLevel) + } + }() + var has bool t.segment, has = t.metacache.GetSegmentByID(t.segmentID) if !has { log.Warn("failed to sync data, segment not found in metacache") @@ -118,30 +140,39 @@ func (t *SyncTask) Run() error { t.segmentID = t.segment.CompactTo() } - err = t.serializeInsertData() + err = t.processInsertBlobs() + if err != nil { + log.Warn("failed to process insert blobs", zap.Error(err)) + return err + } + + err = t.processStatsBlob() if err != nil { log.Warn("failed to serialize insert data", zap.Error(err)) t.handleError(err, metricSegLevel) + log.Warn("failed to process stats blobs", zap.Error(err)) return err } - err = t.serializeDeleteData() + err = t.processDeltaBlob() if err != nil { log.Warn("failed to serialize delete data", zap.Error(err)) t.handleError(err, metricSegLevel) + log.Warn("failed to process delta blobs", zap.Error(err)) return err } - var totalSize float64 = 0 - if t.deleteData != nil { - totalSize += float64(t.deleteData.Size()) - } + /* + var totalSize float64 = 0 + if t.deleteData != nil { + totalSize += float64(t.deleteData.Size()) + } - if t.insertData != nil { - totalSize += float64(t.insertData.GetMemorySize()) - } - metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize) - metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds())) + if t.insertData != nil { + totalSize += float64(t.insertData.GetMemorySize()) + } + metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize) + metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds()))*/ err = t.writeLogs() if err != nil { @@ -180,86 +211,18 @@ func (t *SyncTask) Run() error { return nil } -func (t *SyncTask) serializeInsertData() error { - err := t.serializeBinlog() - if err != nil { - return err - } - - err = t.serializePkStatsLog() - if err != nil { - return err - } - - return nil -} - -func (t *SyncTask) serializeDeleteData() error { - if t.deleteData == nil { +func (t *SyncTask) processInsertBlobs() error { + if len(t.binlogBlobs) == 0 { return nil } - delCodec := storage.NewDeleteCodec() - blob, err := delCodec.Serialize(t.collectionID, t.partitionID, t.segmentID, t.deleteData) + logidx, _, err := t.allocator.Alloc(uint32(len(t.binlogBlobs))) if err != nil { return err } - logID, err := t.allocator.AllocOne() - if err != nil { - log.Error("failed to alloc ID", zap.Error(err)) - return err - } - - value := blob.GetValue() - data := &datapb.Binlog{} - - blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID) - blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) - - t.segmentData[blobPath] = value - data.LogSize = int64(len(blob.Value)) - data.LogPath = blobPath - data.TimestampFrom = t.tsFrom - data.TimestampTo = t.tsTo - data.EntriesNum = t.deleteData.RowCount - t.appendDeltalog(data) - - return nil -} - -func (t *SyncTask) serializeBinlog() error { - if t.insertData == nil { - return nil - } - - // get memory size of buffer data - memSize := make(map[int64]int) - for fieldID, fieldData := range t.insertData.Data { - memSize[fieldID] = fieldData.GetMemorySize() - } - - inCodec := t.getInCodec() - - blobs, err := inCodec.Serialize(t.partitionID, t.segmentID, t.insertData) - if err != nil { - return err - } - - logidx, _, err := t.allocator.Alloc(uint32(len(blobs))) - if err != nil { - return err - } - - for _, blob := range blobs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return err - } - + for fieldID, blob := range t.binlogBlobs { k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logidx) - // [rootPath]/[insert_log]/key key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k) t.segmentData[key] = blob.GetValue() t.appendBinlog(fieldID, &datapb.Binlog{ @@ -267,65 +230,50 @@ func (t *SyncTask) serializeBinlog() error { TimestampFrom: t.tsFrom, TimestampTo: t.tsTo, LogPath: key, - LogSize: int64(memSize[fieldID]), + LogSize: t.binlogMemsize[fieldID], }) - - logidx += 1 + logidx++ } return nil } -func (t *SyncTask) convertInsertData2PkStats(pkFieldID int64, dataType schemapb.DataType) (*storage.PrimaryKeyStats, int64) { - pkFieldData := t.insertData.Data[pkFieldID] - - rowNum := int64(pkFieldData.RowNum()) - - stats, err := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum) - if err != nil { - return nil, 0 +func (t *SyncTask) processStatsBlob() error { + if t.batchStatsBlob != nil { + logidx, err := t.allocator.AllocOne() + if err != nil { + return err + } + t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), logidx, t.batchSize) } - stats.UpdateByMsgs(pkFieldData) - return stats, rowNum -} - -func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error { - blob, err := t.getInCodec().SerializePkStats(stats, rowNum) - if err != nil { - return err + if t.mergedStatsBlob != nil { + totalRowNum := t.segment.NumOfRows() + t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum) } - - logidx, err := t.allocator.AllocOne() - if err != nil { - return err - } - t.convertBlob2StatsBinlog(blob, fieldID, logidx, rowNum) - return nil } -func (t *SyncTask) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType) error { - segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - var statsList []*storage.PrimaryKeyStats - var totalRowNum int64 - for _, segment := range segments { - totalRowNum += segment.NumOfRows() - statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { - return &storage.PrimaryKeyStats{ - FieldID: fieldID, - MaxPk: pks.MaxPK, - MinPk: pks.MinPK, - BF: pks.PkFilter, - PkType: int64(pkType), - } - })...) - } +func (t *SyncTask) processDeltaBlob() error { + if t.deltaBlob != nil { + logID, err := t.allocator.AllocOne() + if err != nil { + log.Error("failed to alloc ID", zap.Error(err)) + return err + } - blob, err := t.getInCodec().SerializePkStatsList(statsList, totalRowNum) - if err != nil { - return err - } - t.convertBlob2StatsBinlog(blob, fieldID, int64(storage.CompoundStatsType), totalRowNum) + value := t.deltaBlob.GetValue() + data := &datapb.Binlog{} + blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID) + blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) + + t.segmentData[blobPath] = value + data.LogSize = int64(len(t.deltaBlob.Value)) + data.LogPath = blobPath + data.TimestampFrom = t.tsFrom + data.TimestampTo = t.tsTo + data.EntriesNum = t.deltaRowCount + t.appendDeltalog(data) + } return nil } @@ -344,30 +292,6 @@ func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID in }) } -func (t *SyncTask) serializePkStatsLog() error { - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fieldID := pkField.GetFieldID() - if t.insertData != nil { - stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType()) - if stats != nil && rowNum > 0 { - err := t.serializeSinglePkStats(fieldID, stats, rowNum) - if err != nil { - return err - } - } - } - - // skip statslog for empty segment - // DO NOT use level check here since Level zero segment may contain insert data in the future - if t.isFlush && t.segment.NumOfRows() > 0 { - return t.serializeMergedPkStats(fieldID, pkField.GetDataType()) - } - return nil -} - func (t *SyncTask) appendBinlog(fieldID int64, binlog *datapb.Binlog) { fieldBinlog, ok := t.insertBinlogs[fieldID] if !ok { @@ -407,15 +331,6 @@ func (t *SyncTask) writeMeta() error { return t.metaWriter.UpdateSync(t) } -func (t *SyncTask) getInCodec() *storage.InsertCodec { - meta := &etcdpb.CollectionMeta{ - Schema: t.schema, - ID: t.collectionID, - } - - return storage.NewInsertCodecWithSchema(meta) -} - func (t *SyncTask) SegmentID() int64 { return t.segmentID } diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 4e8891f19b..dc73ee708e 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -1,6 +1,23 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package syncmgr import ( + "fmt" "math/rand" "testing" "time" @@ -43,11 +60,10 @@ type SyncTaskSuite struct { func (s *SyncTaskSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) - s.collectionID = 100 - s.partitionID = 101 - s.segmentID = 1001 - s.channelName = "by-dev-rootcoord-dml_0_100v0" - + s.collectionID = rand.Int63n(100) + 1000 + s.partitionID = rand.Int63n(100) + 2000 + s.segmentID = rand.Int63n(1000) + 10000 + s.channelName = fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID) s.schema = &schemapb.CollectionSchema{ Name: "sync_task_test_col", Fields: []*schemapb.FieldSchema{ @@ -171,7 +187,7 @@ func (s *SyncTaskSuite) TestRunNormal() { s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.Run("without_insert_delete", func() { + s.Run("without_data", func() { task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithTimeRange(50, 100) @@ -187,7 +203,6 @@ func (s *SyncTaskSuite) TestRunNormal() { s.Run("with_insert_delete_cp", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -195,47 +210,55 @@ func (s *SyncTaskSuite) TestRunNormal() { MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.binlogBlobs[100] = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } err := task.Run() s.NoError(err) }) - s.Run("with_insert_delete_flush", func() { + s.Run("with_statslog", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) - task.WithFlush() - task.WithDrop() + task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ ChannelName: s.channelName, MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.WithFlush() + task.batchStatsBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } + task.mergedStatsBlob = &storage.Blob{ + Key: "1", + Value: []byte("test_data"), + } err := task.Run() s.NoError(err) }) - s.Run("with_zero_numrow_insertdata", func() { + s.Run("with_delta_data", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getEmptyInsertBuffer()) - task.WithFlush() - task.WithDrop() + task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ ChannelName: s.channelName, MsgID: []byte{1, 2, 3, 4}, Timestamp: 100, }) + task.WithDrop() + task.deltaBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } err := task.Run() - s.Error(err) - - err = task.serializePkStatsLog() s.NoError(err) - stats, rowNum := task.convertInsertData2PkStats(100, schemapb.DataType_Int64) - s.Nil(stats) - s.Zero(rowNum) }) } @@ -249,7 +272,10 @@ func (s *SyncTaskSuite) TestRunL0Segment() { s.Run("pure_delete_l0_flush", func() { task := s.getSuiteSyncTask() - task.WithDeleteData(s.getDeleteBuffer()) + task.deltaBlob = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -306,7 +332,6 @@ func (s *SyncTaskSuite) TestRunError() { flag := false handler := func(_ error) { flag = true } task := s.getSuiteSyncTask().WithFailureCallback(handler) - task.WithInsertData(s.getEmptyInsertBuffer()) err := task.Run() @@ -318,29 +343,22 @@ func (s *SyncTaskSuite) TestRunError() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet()) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.Run("serialize_insert_fail", func() { - flag := false - handler := func(_ error) { flag = true } - task := s.getSuiteSyncTask().WithFailureCallback(handler) - task.WithInsertData(s.getEmptyInsertBuffer()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + s.Run("metawrite_fail", func() { + s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked")) + + task := s.getSuiteSyncTask() + task.WithMetaWriter(BrokerMetaWriter(s.broker, retry.Attempts(1))) + task.WithTimeRange(50, 100) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) err := task.Run() - s.Error(err) - s.True(flag) - }) - - s.Run("serailize_delete_fail", func() { - flag := false - handler := func(_ error) { flag = true } - task := s.getSuiteSyncTask().WithFailureCallback(handler) - - task.WithDeleteData(s.getDeleteBufferZeroTs()) - - err := task.Run() - - s.Error(err) - s.True(flag) }) s.Run("chunk_manager_save_fail", func() { @@ -348,10 +366,13 @@ func (s *SyncTaskSuite) TestRunError() { handler := func(_ error) { flag = true } s.chunkManager.ExpectedCalls = nil s.chunkManager.EXPECT().RootPath().Return("files") - s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(errors.New("mocked")) + s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked"))) task := s.getSuiteSyncTask().WithFailureCallback(handler) + task.binlogBlobs[100] = &storage.Blob{ + Key: "100", + Value: []byte("test_data"), + } - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithWriteRetryOptions(retry.Attempts(1)) err := task.Run() diff --git a/internal/datanode/syncmgr/taskv2.go b/internal/datanode/syncmgr/taskv2.go index 547c738182..293ed92ff7 100644 --- a/internal/datanode/syncmgr/taskv2.go +++ b/internal/datanode/syncmgr/taskv2.go @@ -19,12 +19,9 @@ package syncmgr import ( "context" "math" - "strconv" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -36,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" - typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -75,14 +71,13 @@ func (t *SyncTaskV2) Run() error { log := t.getLogger() var err error - infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - if len(infos) == 0 { + segment, ok := t.metacache.GetSegmentByID(t.segmentID) + if !ok { log.Warn("failed to sync data, segment not found in metacache") t.handleError(err) return merr.WrapErrSegmentNotFound(t.segmentID) } - segment := infos[0] if segment.CompactTo() > 0 { log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo())) // update sync task segment id @@ -90,21 +85,6 @@ func (t *SyncTaskV2) Run() error { t.segmentID = segment.CompactTo() } - if err = t.serializeInsertData(); err != nil { - t.handleError(err) - return err - } - - if err = t.serializeStatsData(); err != nil { - t.handleError(err) - return err - } - - if err = t.serializeDeleteData(); err != nil { - t.handleError(err) - return err - } - if err = t.writeSpace(); err != nil { t.handleError(err) return err @@ -128,156 +108,6 @@ func (t *SyncTaskV2) Run() error { return nil } -func (t *SyncTaskV2) serializeInsertData() error { - if t.insertData == nil { - return nil - } - - b := array.NewRecordBuilder(memory.DefaultAllocator, t.arrowSchema) - defer b.Release() - - if err := buildRecord(b, t.insertData, t.schema.Fields); err != nil { - return err - } - - rec := b.NewRecord() - defer rec.Release() - - itr, err := array.NewRecordReader(t.arrowSchema, []arrow.Record{rec}) - if err != nil { - return err - } - itr.Retain() - t.reader = itr - return nil -} - -func (t *SyncTaskV2) serializeStatsData() error { - if t.insertData == nil { - return nil - } - - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fieldID := pkField.GetFieldID() - - stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType()) - - // not flush and not insert data - if !t.isFlush && stats == nil { - return nil - } - if t.isFlush { - return t.serializeMergedPkStats(fieldID, pkField.GetDataType(), stats, rowNum) - } - - return t.serializeSinglePkStats(fieldID, stats, rowNum) -} - -func (t *SyncTaskV2) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType, stats *storage.PrimaryKeyStats, rowNum int64) error { - segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - var statsList []*storage.PrimaryKeyStats - var oldRowNum int64 - for _, segment := range segments { - oldRowNum += segment.NumOfRows() - statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { - return &storage.PrimaryKeyStats{ - FieldID: fieldID, - MaxPk: pks.MaxPK, - MinPk: pks.MinPK, - BF: pks.PkFilter, - PkType: int64(pkType), - } - })...) - } - if stats != nil { - statsList = append(statsList, stats) - } - - blob, err := t.getInCodec().SerializePkStatsList(statsList, oldRowNum+rowNum) - if err != nil { - return err - } - blob.Key = strconv.Itoa(int(storage.CompoundStatsType)) - t.statsBlob = blob - return nil -} - -func (t *SyncTaskV2) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error { - blob, err := t.getInCodec().SerializePkStats(stats, rowNum) - if err != nil { - return err - } - - logidx, err := t.allocator.AllocOne() - if err != nil { - return err - } - - blob.Key = strconv.Itoa(int(logidx)) - t.statsBlob = blob - return nil -} - -func (t *SyncTaskV2) serializeDeleteData() error { - if t.deleteData == nil { - return nil - } - - fields := make([]*schemapb.FieldSchema, 0) - pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) - if pkField == nil { - return merr.WrapErrServiceInternal("cannot find pk field") - } - fields = append(fields, pkField) - tsField := &schemapb.FieldSchema{ - FieldID: common.TimeStampField, - Name: common.TimeStampFieldName, - DataType: schemapb.DataType_Int64, - } - fields = append(fields, tsField) - - schema, err := typeutil2.ConvertToArrowSchema(fields) - if err != nil { - return err - } - - b := array.NewRecordBuilder(memory.DefaultAllocator, schema) - defer b.Release() - - switch pkField.DataType { - case schemapb.DataType_Int64: - pb := b.Field(0).(*array.Int64Builder) - for _, pk := range t.deleteData.Pks { - pb.Append(pk.GetValue().(int64)) - } - case schemapb.DataType_VarChar: - pb := b.Field(0).(*array.StringBuilder) - for _, pk := range t.deleteData.Pks { - pb.Append(pk.GetValue().(string)) - } - default: - return merr.WrapErrParameterInvalidMsg("unexpected pk type %v", pkField.DataType) - } - - for _, ts := range t.deleteData.Tss { - b.Field(1).(*array.Int64Builder).Append(int64(ts)) - } - - rec := b.NewRecord() - defer rec.Release() - - reader, err := array.NewRecordReader(schema, []arrow.Record{rec}) - if err != nil { - return err - } - - t.deleteReader = reader - return nil -} - func (t *SyncTaskV2) writeSpace() error { defer func() { if t.reader != nil { @@ -288,37 +118,6 @@ func (t *SyncTaskV2) writeSpace() error { } }() - // url := fmt.Sprintf("s3://%s:%s@%s/%d?endpoint_override=%s", - // params.Params.MinioCfg.AccessKeyID.GetValue(), - // params.Params.MinioCfg.SecretAccessKey.GetValue(), - // params.Params.MinioCfg.BucketName.GetValue(), - // t.segmentID, - // params.Params.MinioCfg.Address.GetValue()) - - // pkSchema, err := typeutil.GetPrimaryFieldSchema(t.schema) - // if err != nil { - // return err - // } - // vecSchema, err := typeutil.GetVectorFieldSchema(t.schema) - // if err != nil { - // return err - // } - // space, err := milvus_storage.Open( - // url, - // options.NewSpaceOptionBuilder(). - // SetSchema(schema.NewSchema( - // t.arrowSchema, - // &schema.SchemaOptions{ - // PrimaryColumn: pkSchema.Name, - // VectorColumn: vecSchema.Name, - // VersionColumn: common.TimeStampFieldName, - // }, - // )). - // Build(), - // ) - // if err != nil { - // return err - // } txn := t.space.NewTransaction() if t.reader != nil { txn.Write(t.reader, &options.DefaultWriteOptions) @@ -483,16 +282,6 @@ func (t *SyncTaskV2) WithAllocator(allocator allocator.Interface) *SyncTaskV2 { return t } -func (t *SyncTaskV2) WithInsertData(insertData *storage.InsertData) *SyncTaskV2 { - t.insertData = insertData - return t -} - -func (t *SyncTaskV2) WithDeleteData(deleteData *storage.DeleteData) *SyncTaskV2 { - t.deleteData = deleteData - return t -} - func (t *SyncTaskV2) WithStartPosition(start *msgpb.MsgPosition) *SyncTaskV2 { t.startPosition = start return t diff --git a/internal/datanode/syncmgr/taskv2_test.go b/internal/datanode/syncmgr/taskv2_test.go index 226abb5ea6..c58b45400d 100644 --- a/internal/datanode/syncmgr/taskv2_test.go +++ b/internal/datanode/syncmgr/taskv2_test.go @@ -17,6 +17,7 @@ package syncmgr import ( + "context" "fmt" "math/rand" "testing" @@ -28,7 +29,6 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -167,19 +166,33 @@ func (s *SyncTaskSuiteV2) getDeleteBufferZeroTs() *storage.DeleteData { } func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 { - log.Info("space", zap.Any("space", s.space)) - task := NewSyncTaskV2(). - WithArrowSchema(s.arrowSchema). - WithSpace(s.space). - WithCollectionID(s.collectionID). + pack := &SyncPack{} + + pack.WithCollectionID(s.collectionID). WithPartitionID(s.partitionID). WithSegmentID(s.segmentID). WithChannelName(s.channelName). - WithSchema(s.schema). - WithAllocator(s.allocator). - WithMetaCache(s.metacache) + WithCheckpoint(&msgpb.MsgPosition{ + Timestamp: 1000, + ChannelName: s.channelName, + }) + pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithDeleteData(s.getDeleteBuffer()) - return task + storageCache, err := metacache.NewStorageV2Cache(s.schema) + s.Require().NoError(err) + + s.metacache.EXPECT().Collection().Return(s.collectionID) + s.metacache.EXPECT().Schema().Return(s.schema) + serializer, err := NewStorageV2Serializer(storageCache, s.metacache, nil) + s.Require().NoError(err) + task, err := serializer.EncodeBuffer(context.Background(), pack) + s.Require().NoError(err) + taskV2, ok := task.(*SyncTaskV2) + s.Require().True(ok) + taskV2.WithMetaCache(s.metacache) + + return taskV2 } func (s *SyncTaskSuiteV2) TestRunNormal() { @@ -202,6 +215,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { bfs.UpdatePKRange(fd) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -221,7 +235,6 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { s.Run("with_insert_delete_cp", func() { task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ @@ -233,22 +246,6 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { err := task.Run() s.NoError(err) }) - - s.Run("with_insert_delete_flush", func() { - task := s.getSuiteSyncTask() - task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) - task.WithFlush() - task.WithDrop() - task.WithMetaWriter(BrokerMetaWriter(s.broker)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - - err := task.Run() - s.NoError(err) - }) } func (s *SyncTaskSuiteV2) TestBuildRecord() { diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 6fe27d7898..44bc68f647 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -18,8 +18,12 @@ type bfWriteBuffer struct { } func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { + base, err := newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option) + if err != nil { + return nil, err + } return &bfWriteBuffer{ - writeBufferBase: newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option), + writeBufferBase: base, syncMgr: syncMgr, }, nil } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 9fa81aa0e5..3dcd0b9063 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -63,6 +63,10 @@ func (s *BFWriteBufferSuite) SetupSuite() { }, }, } + + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageV2Cache = storageCache } func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { @@ -154,7 +158,9 @@ func (s *BFWriteBufferSuite) SetupTest() { } func (s *BFWriteBufferSuite) TestBufferData() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{}) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{}) s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) @@ -205,6 +211,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true") + defer params.Params.Reset(params.Params.CommonCfg.EnableStorageV2.Key) params.Params.CommonCfg.StorageScheme.SwapTempValue("file") tmpDir := s.T().TempDir() arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) @@ -279,6 +286,14 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { }) } +func (s *BFWriteBufferSuite) TestCreateFailure() { + metacache := metacache.NewMockMetaCache(s.T()) + metacache.EXPECT().Collection().Return(s.collID) + metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) + _, err := NewBFWriteBuffer(s.channelName, metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{}) + s.Error(err) +} + func TestBFWriteBuffer(t *testing.T) { suite.Run(t, new(BFWriteBufferSuite)) } diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 3cf4041e71..3d9f03fc1f 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -32,10 +32,14 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca if option.idAllocator == nil { return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer") } + base, err := newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option) + if err != nil { + return nil, err + } return &l0WriteBuffer{ l0Segments: make(map[int64]int64), l0partition: make(map[int64]int64), - writeBufferBase: newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option), + writeBufferBase: base, syncMgr: syncMgr, idAllocator: option.idAllocator, }, nil diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index d654fb40be..deae26eebe 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -25,12 +25,13 @@ import ( type L0WriteBufferSuite struct { suite.Suite - channelName string - collID int64 - collSchema *schemapb.CollectionSchema - syncMgr *syncmgr.MockSyncManager - metacache *metacache.MockMetaCache - allocator *allocator.MockGIDAllocator + channelName string + collID int64 + collSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + allocator *allocator.MockGIDAllocator + storageCache *metacache.StorageV2Cache } func (s *L0WriteBufferSuite) SetupSuite() { @@ -57,6 +58,10 @@ func (s *L0WriteBufferSuite) SetupSuite() { }, } s.channelName = "by-dev-rootcoord-dml_0v0" + + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageCache = storageCache } func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { @@ -146,7 +151,7 @@ func (s *L0WriteBufferSuite) SetupTest() { } func (s *L0WriteBufferSuite) TestBufferData() { - wb, err := NewL0WriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ idAllocator: s.allocator, }) s.NoError(err) @@ -165,6 +170,16 @@ func (s *L0WriteBufferSuite) TestBufferData() { s.NoError(err) } +func (s *L0WriteBufferSuite) TestCreateFailure() { + metacache := metacache.NewMockMetaCache(s.T()) + metacache.EXPECT().Collection().Return(s.collID) + metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) + _, err := NewL0WriteBuffer(s.channelName, metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.Error(err) +} + func TestL0WriteBuffer(t *testing.T) { suite.Run(t, new(L0WriteBufferSuite)) } diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 144878a660..89f05f25e5 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -38,6 +38,8 @@ func (s *ManagerSuite) SetupSuite() { s.collSchema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64, Name: common.RowIDFieldName}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64, Name: common.TimeStampFieldName}, { FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, }, @@ -69,10 +71,13 @@ func (s *ManagerSuite) SetupTest() { func (s *ManagerSuite) TestRegister() { manager := s.manager - err := manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.NoError(err) - err = manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.Error(err) s.ErrorIs(err, merr.ErrChannelReduplicate) } @@ -176,7 +181,9 @@ func (s *ManagerSuite) TestRemoveChannel() { }) s.Run("remove_channel", func() { - err := manager.Register(s.channelName, s.metacache, nil, WithIDAllocator(s.allocator)) + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + err = manager.Register(s.channelName, s.metacache, storageCache, WithIDAllocator(s.allocator)) s.Require().NoError(err) s.NotPanics(func() { diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index ddd6c2316e..3111e6df01 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v12/arrow" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -13,16 +12,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - milvus_storage "github.com/milvus-io/milvus-storage/go/storage" - "github.com/milvus-io/milvus-storage/go/storage/options" - "github.com/milvus-io/milvus-storage/go/storage/schema" "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -65,9 +60,9 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach switch option.deletePolicy { case DeletePolicyBFPkOracle: - return NewBFWriteBuffer(channel, metacache, nil, syncMgr, option) + return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option) case DeletePolicyL0Delta: - return NewL0WriteBuffer(channel, metacache, nil, syncMgr, option) + return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option) default: return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy) } @@ -85,7 +80,9 @@ type writeBufferBase struct { metaCache metacache.MetaCache syncMgr syncmgr.SyncManager broker broker.Broker - buffers map[int64]*segmentBuffer // segmentID => segmentBuffer + serializer syncmgr.Serializer + + buffers map[int64]*segmentBuffer // segmentID => segmentBuffer syncPolicies []SyncPolicy checkpoint *msgpb.MsgPosition @@ -94,11 +91,29 @@ type writeBufferBase struct { storagev2Cache *metacache.StorageV2Cache } -func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase { +func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) { flushTs := atomic.NewUint64(nonFlushTS) flushTsPolicy := GetFlushTsPolicy(flushTs, metacache) option.syncPolicies = append(option.syncPolicies, flushTsPolicy) + var serializer syncmgr.Serializer + var err error + if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { + serializer, err = syncmgr.NewStorageV2Serializer( + storageV2Cache, + metacache, + option.metaWriter, + ) + } else { + serializer, err = syncmgr.NewStorageSerializer( + metacache, + option.metaWriter, + ) + } + if err != nil { + return nil, err + } + return &writeBufferBase{ channelName: channel, collectionID: metacache.Collection(), @@ -107,10 +122,11 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2 metaWriter: option.metaWriter, buffers: make(map[int64]*segmentBuffer), metaCache: metacache, + serializer: serializer, syncPolicies: option.syncPolicies, flushTimestamp: flushTs, storagev2Cache: storageV2Cache, - } + }, nil } func (wb *writeBufferBase) HasSegment(segmentID int64) bool { @@ -238,8 +254,9 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64 func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) { for _, segmentID := range segmentIDs { - syncTask := wb.getSyncTask(ctx, segmentID) - if syncTask == nil { + syncTask, err := wb.getSyncTask(ctx, segmentID) + if err != nil { + // TODO check err type // segment info not found log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID)) continue @@ -344,49 +361,14 @@ func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKe return nil } -func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) { - return func() (*milvus_storage.Space, error) { - url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s", - params.Params.CommonCfg.StorageScheme.GetValue(), - params.Params.MinioCfg.AccessKeyID.GetValue(), - params.Params.MinioCfg.SecretAccessKey.GetValue(), - params.Params.MinioCfg.BucketName.GetValue(), - segmentID, - params.Params.MinioCfg.Address.GetValue()) - - pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema) - if err != nil { - return nil, err - } - vecSchema, err := typeutil.GetVectorFieldSchema(collSchema) - if err != nil { - return nil, err - } - space, err := milvus_storage.Open( - url, - options.NewSpaceOptionBuilder(). - SetSchema(schema.NewSchema( - arrowSchema, - &schema.SchemaOptions{ - PrimaryColumn: pkSchema.Name, - VectorColumn: vecSchema.Name, - VersionColumn: common.TimeStampFieldName, - }, - )). - Build(), - ) - return space, err - } -} - -func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syncmgr.Task { +func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) { log := log.Ctx(ctx).With( zap.Int64("segmentID", segmentID), ) segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) // wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) if !ok { log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID)) - return nil + return nil, merr.WrapErrSegmentNotFound(segmentID) } var batchSize int64 var totalMemSize float64 @@ -397,7 +379,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax } - actions := []metacache.SegmentAction{metacache.RollStats()} + actions := []metacache.SegmentAction{} if insert != nil { batchSize = int64(insert.GetRowNum()) totalMemSize += float64(insert.GetMemorySize()) @@ -408,69 +390,26 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn actions = append(actions, metacache.StartSyncing(batchSize)) wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID)) - var syncTask syncmgr.Task - if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { - arrowSchema := wb.storagev2Cache.ArrowSchema() - space, err := wb.storagev2Cache.GetOrCreateSpace(segmentID, SpaceCreatorFunc(segmentID, wb.collSchema, arrowSchema)) - if err != nil { - log.Warn("failed to get or create space", zap.Error(err)) - return nil - } + pack := &syncmgr.SyncPack{} + pack.WithInsertData(insert). + WithDeleteData(delta). + WithCollectionID(wb.collectionID). + WithPartitionID(segmentInfo.PartitionID()). + WithChannelName(wb.channelName). + WithSegmentID(segmentID). + WithStartPosition(startPos). + WithTimeRange(tsFrom, tsTo). + WithLevel(segmentInfo.Level()). + WithCheckpoint(wb.checkpoint). + WithBatchSize(batchSize) - task := syncmgr.NewSyncTaskV2(). - WithInsertData(insert). - WithDeleteData(delta). - WithCollectionID(wb.collectionID). - WithPartitionID(segmentInfo.PartitionID()). - WithChannelName(wb.channelName). - WithSegmentID(segmentID). - WithStartPosition(startPos). - WithTimeRange(tsFrom, tsTo). - WithLevel(segmentInfo.Level()). - WithCheckpoint(wb.checkpoint). - WithSchema(wb.collSchema). - WithBatchSize(batchSize). - WithMetaCache(wb.metaCache). - WithMetaWriter(wb.metaWriter). - WithArrowSchema(arrowSchema). - WithSpace(space). - WithFailureCallback(func(err error) { - // TODO could change to unsub channel in the future - panic(err) - }) - if segmentInfo.State() == commonpb.SegmentState_Flushing { - task.WithFlush() - } - syncTask = task - } else { - task := syncmgr.NewSyncTask(). - WithInsertData(insert). - WithDeleteData(delta). - WithCollectionID(wb.collectionID). - WithPartitionID(segmentInfo.PartitionID()). - WithChannelName(wb.channelName). - WithSegmentID(segmentID). - WithStartPosition(startPos). - WithTimeRange(tsFrom, tsTo). - WithLevel(segmentInfo.Level()). - WithCheckpoint(wb.checkpoint). - WithSchema(wb.collSchema). - WithBatchSize(batchSize). - WithMetaCache(wb.metaCache). - WithMetaWriter(wb.metaWriter). - WithFailureCallback(func(err error) { - // TODO could change to unsub channel in the future - panic(err) - }) - if segmentInfo.State() == commonpb.SegmentState_Flushing { - task.WithFlush() - } - syncTask = task + if segmentInfo.State() == commonpb.SegmentState_Flushing { + pack.WithFlush() } metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize) - return syncTask + return wb.serializer.EncodeBuffer(ctx, pack) } func (wb *writeBufferBase) Close(drop bool) { @@ -483,8 +422,9 @@ func (wb *writeBufferBase) Close(drop bool) { var futures []*conc.Future[error] for id := range wb.buffers { - syncTask := wb.getSyncTask(context.Background(), id) - if syncTask == nil { + syncTask, err := wb.getSyncTask(context.Background(), id) + if err != nil { + // TODO continue } switch t := syncTask.(type) { diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 9ee97c8cce..8244eeb36f 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -20,12 +20,13 @@ import ( type WriteBufferSuite struct { suite.Suite - collID int64 - channelName string - collSchema *schemapb.CollectionSchema - wb *writeBufferBase - syncMgr *syncmgr.MockSyncManager - metacache *metacache.MockMetaCache + collID int64 + channelName string + collSchema *schemapb.CollectionSchema + wb *writeBufferBase + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + storageCache *metacache.StorageV2Cache } func (s *WriteBufferSuite) SetupSuite() { @@ -44,20 +45,24 @@ func (s *WriteBufferSuite) SetupSuite() { } func (s *WriteBufferSuite) SetupTest() { + storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + s.Require().NoError(err) + s.storageCache = storageCache s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() s.metacache.EXPECT().Collection().Return(s.collID).Maybe() - s.wb = newWriteBufferBase(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ + s.wb, err = newWriteBufferBase(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{ pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, }) + s.Require().NoError(err) } func (s *WriteBufferSuite) TestDefaultOption() { s.Run("default BFPkOracle", func() { - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr) s.NoError(err) _, ok := wb.(*bfWriteBuffer) s.True(ok) @@ -66,7 +71,7 @@ func (s *WriteBufferSuite) TestDefaultOption() { s.Run("default L0Delta policy", func() { paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key) - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator())) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator())) s.NoError(err) _, ok := wb.(*l0WriteBuffer) s.True(ok) @@ -74,18 +79,18 @@ func (s *WriteBufferSuite) TestDefaultOption() { } func (s *WriteBufferSuite) TestWriteBufferType() { - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) _, ok := wb.(*bfWriteBuffer) s.True(ok) - wb, err = NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) + wb, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) s.NoError(err) _, ok = wb.(*l0WriteBuffer) s.True(ok) - _, err = NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy("")) + _, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy("")) s.Error(err) } @@ -102,9 +107,9 @@ func (s *WriteBufferSuite) TestHasSegment() { func (s *WriteBufferSuite) TestFlushSegments() { segmentID := int64(1001) - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything) + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) err = wb.FlushSegments(context.Background(), []int64{segmentID})