diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 413a404e59..905d1e7339 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -26,7 +26,7 @@ func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { } func (policy *l0CompactionPolicy) Enable() bool { - return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() && Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool() + return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() } func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4618caa90d..b374c113ae 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -288,10 +288,6 @@ func (node *DataNode) tryToReleaseFlowgraph(channel string) { // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { - if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { - panic("In non-L0 mode, skip loading of bloom filter stats is not allowed.") - } - var startErr error node.startOnce.Do(func() { if err := node.allocator.Start(); err != nil { diff --git a/internal/flushcommon/writebuffer/bf_write_buffer.go b/internal/flushcommon/writebuffer/bf_write_buffer.go deleted file mode 100644 index b4541d9902..0000000000 --- a/internal/flushcommon/writebuffer/bf_write_buffer.go +++ /dev/null @@ -1,126 +0,0 @@ -package writebuffer - -import ( - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/flushcommon/metacache" - "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type bfWriteBuffer struct { - *writeBufferBase - - syncMgr syncmgr.SyncManager - metacache metacache.MetaCache -} - -func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { - base, err := newWriteBufferBase(channel, metacache, syncMgr, option) - if err != nil { - return nil, err - } - return &bfWriteBuffer{ - writeBufferBase: base, - syncMgr: syncMgr, - }, nil -} - -func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { - batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() - - split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) { - lc := storage.NewBatchLocationsCache(pks) - for _, segment := range segments { - hits := segment.GetBloomFilterSet().BatchPkExist(lc) - var deletePks []storage.PrimaryKey - var deleteTss []typeutil.Timestamp - for i, hit := range hits { - if hit { - deletePks = append(deletePks, pks[i]) - deleteTss = append(deleteTss, pkTss[i]) - } - } - - if len(deletePks) > 0 { - wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) - } - } - } - - // distribute delete msg for previous data - for _, delMsg := range deleteMsgs { - pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) - pkTss := delMsg.GetTimestamps() - segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) - - for idx := 0; idx < len(pks); idx += batchSize { - endIdx := idx + batchSize - if endIdx > len(pks) { - endIdx = len(pks) - } - split(pks[idx:endIdx], pkTss[idx:endIdx], segments) - } - - for _, inData := range groups { - if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { - var deletePks []storage.PrimaryKey - var deleteTss []typeutil.Timestamp - for idx, pk := range pks { - ts := delMsg.GetTimestamps()[idx] - if inData.pkExists(pk, ts) { - deletePks = append(deletePks, pk) - deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) - } - } - if len(deletePks) > 0 { - wb.bufferDelete(inData.segmentID, deletePks, deleteTss, startPos, endPos) - } - } - } - } -} - -func (wb *bfWriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { - wb.mut.Lock() - defer wb.mut.Unlock() - - // buffer insert data and add segment if not exists - for _, inData := range insertData { - err := wb.bufferInsert(inData, startPos, endPos) - if err != nil { - return err - } - } - - // distribute delete msg - // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data - wb.dispatchDeleteMsgs(insertData, deleteMsgs, startPos, endPos) - - // update pk oracle - for _, inData := range insertData { - // segment shall always exists after buffer insert - segments := wb.metaCache.GetSegmentsBy( - metacache.WithSegmentIDs(inData.segmentID)) - for _, segment := range segments { - for _, fieldData := range inData.pkField { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err - } - } - } - } - - // update buffer last checkpoint - wb.checkpoint = endPos - - _ = wb.triggerSync() - - return nil -} diff --git a/internal/flushcommon/writebuffer/bf_write_buffer_test.go b/internal/flushcommon/writebuffer/bf_write_buffer_test.go deleted file mode 100644 index a9c13e5c3b..0000000000 --- a/internal/flushcommon/writebuffer/bf_write_buffer_test.go +++ /dev/null @@ -1,334 +0,0 @@ -package writebuffer - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/samber/lo" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/flushcommon/broker" - "github.com/milvus-io/milvus/internal/flushcommon/metacache" - "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" - "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/testutils" - "github.com/milvus-io/milvus/pkg/util/tsoutil" -) - -type BFWriteBufferSuite struct { - testutils.PromMetricsSuite - collID int64 - channelName string - syncMgr *syncmgr.MockSyncManager - metacacheInt64 *metacache.MockMetaCache - metacacheVarchar *metacache.MockMetaCache - broker *broker.MockBroker - - collInt64Schema *schemapb.CollectionSchema - collInt64PkField *schemapb.FieldSchema - - collVarcharSchema *schemapb.CollectionSchema - collVarcharPkField *schemapb.FieldSchema -} - -func (s *BFWriteBufferSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) - s.collID = 100 - s.collInt64Schema = &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, - }, - { - FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "128"}, - }, - }, - }, - } - - s.collInt64PkField = &schemapb.FieldSchema{ - FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, - } - - s.collVarcharSchema = &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.MaxLengthKey, Value: "100"}, - }, - }, - { - FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "128"}, - }, - }, - }, - } - s.collVarcharPkField = &schemapb.FieldSchema{ - FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.MaxLengthKey, Value: "100"}, - }, - } -} - -func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) { - tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) - vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { - return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) - }) - - var pkField *schemapb.FieldData - switch pkType { - case schemapb.DataType_Int64: - pkField = &schemapb.FieldData{ - FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: tss, - }, - }, - }, - }, - } - case schemapb.DataType_VarChar: - pkField = &schemapb.FieldData{ - FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_StringData{ - StringData: &schemapb.StringArray{ - Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }), - }, - }, - }, - }, - } - } - flatten := lo.Flatten(vectors) - return tss, &msgstream.InsertMsg{ - InsertRequest: &msgpb.InsertRequest{ - SegmentID: segmentID, - Version: msgpb.InsertDataVersion_ColumnBased, - RowIDs: tss, - Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }), - FieldsData: []*schemapb.FieldData{ - { - FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: tss, - }, - }, - }, - }, - }, - { - FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: tss, - }, - }, - }, - }, - }, - pkField, - { - FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, - Field: &schemapb.FieldData_Vectors{ - Vectors: &schemapb.VectorField{ - Dim: int64(dim), - Data: &schemapb.VectorField_FloatVector{ - FloatVector: &schemapb.FloatArray{ - Data: flatten, - }, - }, - }, - }, - }, - }, - }, - } -} - -func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg { - delMsg := &msgstream.DeleteMsg{ - DeleteRequest: &msgpb.DeleteRequest{ - PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), - Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx+1)) }), - }, - } - return delMsg -} - -func (s *BFWriteBufferSuite) SetupTest() { - s.syncMgr = syncmgr.NewMockSyncManager(s.T()) - s.metacacheInt64 = metacache.NewMockMetaCache(s.T()) - s.metacacheInt64.EXPECT().Schema().Return(s.collInt64Schema).Maybe() - s.metacacheInt64.EXPECT().Collection().Return(s.collID).Maybe() - s.metacacheVarchar = metacache.NewMockMetaCache(s.T()) - s.metacacheVarchar.EXPECT().Schema().Return(s.collVarcharSchema).Maybe() - s.metacacheVarchar.EXPECT().Collection().Return(s.collID).Maybe() - - s.broker = broker.NewMockBroker(s.T()) -} - -func (s *BFWriteBufferSuite) TestBufferData() { - s.Run("normal_run_int64", func() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.syncMgr, &writeBufferOption{}) - s.NoError(err) - - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil) - s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - - pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - - metrics.DataNodeFlowGraphBufferDataSize.Reset() - insertData, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg}) - s.NoError(err) - - err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) - - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) - s.NoError(err) - s.MetricsEqual(value, 5607) - - delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - err = wb.BufferData([]*InsertData{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) - s.MetricsEqual(value, 5847) - }) - - s.Run("normal_run_varchar", func() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, s.syncMgr, &writeBufferOption{}) - s.NoError(err) - - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil) - s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - - pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) })) - - metrics.DataNodeFlowGraphBufferDataSize.Reset() - insertData, err := PrepareInsert(s.collVarcharSchema, s.collVarcharPkField, []*msgstream.InsertMsg{msg}) - s.NoError(err) - - err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) - - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) - s.NoError(err) - s.MetricsEqual(value, 7227) - }) -} - -func (s *BFWriteBufferSuite) TestPrepareInsert() { - s.Run("int_pk_type_not_match", func() { - _, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) - _, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg}) - s.Error(err) - }) - - s.Run("varchar_pk_not_match", func() { - _, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) - _, err := PrepareInsert(s.collVarcharSchema, s.collVarcharPkField, []*msgstream.InsertMsg{msg}) - s.Error(err) - }) -} - -func (s *BFWriteBufferSuite) TestAutoSync() { - paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") - - s.Run("normal_auto_sync", func() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.syncMgr, &writeBufferOption{ - syncPolicies: []SyncPolicy{ - GetFullBufferPolicy(), - GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), - GetSealedSegmentsPolicy(s.metacacheInt64), - }, - }) - s.NoError(err) - - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil) - seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, pkoracle.NewBloomFilterSet(), nil) - s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() - s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() - s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) - s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) - s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{}) - s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil) - - pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - - insertData, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg}) - s.NoError(err) - - metrics.DataNodeFlowGraphBufferDataSize.Reset() - err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) - - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) - s.NoError(err) - s.MetricsEqual(value, 0) - }) -} - -func (s *BFWriteBufferSuite) TestCreateFailure() { - metacache := metacache.NewMockMetaCache(s.T()) - metacache.EXPECT().Collection().Return(s.collID) - metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) - _, err := NewBFWriteBuffer(s.channelName, metacache, s.syncMgr, &writeBufferOption{}) - s.Error(err) -} - -func TestBFWriteBuffer(t *testing.T) { - suite.Run(t, new(BFWriteBufferSuite)) -} diff --git a/internal/flushcommon/writebuffer/options.go b/internal/flushcommon/writebuffer/options.go index dd43eb2c36..52f3361cf4 100644 --- a/internal/flushcommon/writebuffer/options.go +++ b/internal/flushcommon/writebuffer/options.go @@ -9,20 +9,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -const ( - // DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy - DeletePolicyBFPkOracle = `bloom_filter_pkoracle` - - // DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy. - DeletePolicyL0Delta = `l0_delta` -) - type WriteBufferOption func(opt *writeBufferOption) type TaskObserverCallback func(t syncmgr.Task, err error) type writeBufferOption struct { - deletePolicy string idAllocator allocator.Interface syncPolicies []SyncPolicy @@ -33,13 +24,7 @@ type writeBufferOption struct { } func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption { - deletePolicy := DeletePolicyBFPkOracle - if paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { - deletePolicy = DeletePolicyL0Delta - } - return &writeBufferOption{ - deletePolicy: deletePolicy, syncPolicies: []SyncPolicy{ GetFullBufferPolicy(), GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), @@ -53,12 +38,6 @@ func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption { } } -func WithDeletePolicy(policy string) WriteBufferOption { - return func(opt *writeBufferOption) { - opt.deletePolicy = policy - } -} - func WithIDAllocator(allocator allocator.Interface) WriteBufferOption { return func(opt *writeBufferOption) { opt.idAllocator = allocator diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 1e6bfb7209..4cb0c32437 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -119,14 +119,7 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncm opt(option) } - switch option.deletePolicy { - case DeletePolicyBFPkOracle: - return NewBFWriteBuffer(channel, metacache, syncMgr, option) - case DeletePolicyL0Delta: - return NewL0WriteBuffer(channel, metacache, syncMgr, option) - default: - return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy) - } + return NewL0WriteBuffer(channel, metacache, syncMgr, option) } // writeBufferBase is the common component for buffering data diff --git a/internal/flushcommon/writebuffer/write_buffer_test.go b/internal/flushcommon/writebuffer/write_buffer_test.go index 7363fa2ad9..b40b36720e 100644 --- a/internal/flushcommon/writebuffer/write_buffer_test.go +++ b/internal/flushcommon/writebuffer/write_buffer_test.go @@ -60,42 +60,6 @@ func (s *WriteBufferSuite) SetupTest() { s.Require().NoError(err) } -func (s *WriteBufferSuite) TestDefaultOption() { - s.Run("default BFPkOracle", func() { - paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "false") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key) - wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr) - s.NoError(err) - _, ok := wb.(*bfWriteBuffer) - s.True(ok) - }) - - s.Run("default L0Delta policy", func() { - paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key) - wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator())) - s.NoError(err) - _, ok := wb.(*l0WriteBuffer) - s.True(ok) - }) -} - -func (s *WriteBufferSuite) TestWriteBufferType() { - wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) - s.NoError(err) - - _, ok := wb.(*bfWriteBuffer) - s.True(ok) - - wb, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) - s.NoError(err) - _, ok = wb.(*l0WriteBuffer) - s.True(ok) - - _, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy("")) - s.Error(err) -} - func (s *WriteBufferSuite) TestHasSegment() { segmentID := int64(1001) @@ -111,8 +75,7 @@ func (s *WriteBufferSuite) TestFlushSegments() { s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().GetSegmentByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, true) - - wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithIDAllocator(allocator.NewMockAllocator(s.T()))) s.NoError(err) err = wb.SealSegments(context.Background(), []int64{segmentID}) diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go index c72726b9a2..52413071da 100644 --- a/internal/streamingnode/server/server.go +++ b/internal/streamingnode/server/server.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" - "github.com/milvus-io/milvus/pkg/util/paramtable" ) // Server is the streamingnode server. @@ -44,9 +43,6 @@ func (s *Server) Init(ctx context.Context) (err error) { // Start starts the streamingnode server. func (s *Server) Start() { - if !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { - panic("In streaming service mode, disable L0 is not allowed.") - } resource.Resource().Flusher().Start() log.Info("flusher started") } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d122694753..698f28d81f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3231,7 +3231,6 @@ type dataCoordConfig struct { ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"` // LevelZero Segment - EnableLevelZeroSegment ParamItem `refreshable:"false"` LevelZeroCompactionTriggerMinSize ParamItem `refreshable:"true"` LevelZeroCompactionTriggerMaxSize ParamItem `refreshable:"true"` LevelZeroCompactionTriggerDeltalogMinNum ParamItem `refreshable:"true"` @@ -3627,15 +3626,6 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.SyncSegmentsInterval.Init(base.mgr) - // LevelZeroCompaction - p.EnableLevelZeroSegment = ParamItem{ - Key: "dataCoord.segment.enableLevelZero", - Version: "2.4.0", - Doc: "Whether to enable LevelZeroCompaction", - DefaultValue: "true", - } - p.EnableLevelZeroSegment.Init(base.mgr) - p.LevelZeroCompactionTriggerMinSize = ParamItem{ Key: "dataCoord.compaction.levelzero.forceTrigger.minSize", Version: "2.4.0",