fix: Remove enableLevelZeroSegment config (#36507)

See also: #36504
pr: #36535

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-10-11 16:41:21 +08:00 committed by GitHub
parent 28de6b86ba
commit e976b41f97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 3 additions and 642 deletions

View File

@ -26,7 +26,7 @@ func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
}
func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() && Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool()
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {

View File

@ -1,130 +0,0 @@
package writebuffer
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type bfWriteBuffer struct {
*writeBufferBase
syncMgr syncmgr.SyncManager
metacache metacache.MetaCache
}
func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
base, err := newWriteBufferBase(channel, metacache, storageV2Cache, syncMgr, option)
if err != nil {
return nil, err
}
return &bfWriteBuffer{
writeBufferBase: base,
syncMgr: syncMgr,
}, nil
}
func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) {
lc := storage.NewBatchLocationsCache(pks)
for _, segment := range segments {
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for i, hit := range hits {
if hit {
deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, pkTss[i])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
}
}
}
// distribute delete msg for previous data
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments)
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
ts := delMsg.GetTimestamps()[idx]
if inData.pkExists(pk, ts) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(inData.segmentID, deletePks, deleteTss, startPos, endPos)
}
}
}
}
}
func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
groups, err := wb.prepareInsert(insertMsgs)
if err != nil {
return err
}
// buffer insert data and add segment if not exists
for _, inData := range groups {
err := wb.bufferInsert(inData, startPos, endPos)
if err != nil {
return err
}
}
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
// update pk oracle
for _, inData := range groups {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
// update buffer last checkpoint
wb.checkpoint = endPos
_ = wb.triggerSync()
return nil
}

View File

@ -1,434 +0,0 @@
package writebuffer
import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type BFWriteBufferSuite struct {
testutils.PromMetricsSuite
collID int64
channelName string
collInt64Schema *schemapb.CollectionSchema
collVarcharSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacacheInt64 *metacache.MockMetaCache
metacacheVarchar *metacache.MockMetaCache
broker *broker.MockBroker
storageV2Cache *metacache.StorageV2Cache
}
func (s *BFWriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collInt64Schema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
s.collVarcharSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "100"},
},
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
s.storageV2Cache = storageCache
}
func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
var pkField *schemapb.FieldData
switch pkType {
case schemapb.DataType_Int64:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
}
case schemapb.DataType_VarChar:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }),
},
},
},
},
}
}
flatten := lo.Flatten(vectors)
return tss, &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{
SegmentID: segmentID,
Version: msgpb.InsertDataVersion_ColumnBased,
RowIDs: tss,
Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }),
FieldsData: []*schemapb.FieldData{
{
FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
pkField,
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: flatten,
},
},
},
},
},
},
},
}
}
func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg {
delMsg := &msgstream.DeleteMsg{
DeleteRequest: &msgpb.DeleteRequest{
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx+1)) }),
},
}
return delMsg
}
func (s *BFWriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacacheInt64 = metacache.NewMockMetaCache(s.T())
s.metacacheInt64.EXPECT().Schema().Return(s.collInt64Schema).Maybe()
s.metacacheInt64.EXPECT().Collection().Return(s.collID).Maybe()
s.metacacheVarchar = metacache.NewMockMetaCache(s.T())
s.metacacheVarchar.EXPECT().Schema().Return(s.collVarcharSchema).Maybe()
s.metacacheVarchar.EXPECT().Collection().Return(s.collID).Maybe()
s.broker = broker.NewMockBroker(s.T())
var err error
s.storageV2Cache, err = metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
}
func (s *BFWriteBufferSuite) TestBufferData() {
s.Run("normal_run_int64", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5604)
delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
s.MetricsEqual(value, 5844)
})
s.Run("normal_run_varchar", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 7224)
})
s.Run("int_pk_type_not_match", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
s.Run("varchar_pk_not_match", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
}
func (s *BFWriteBufferSuite) TestAutoSync() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, nil, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetSealedSegmentsPolicy(s.metacacheInt64),
},
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 0)
})
}
func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
params.Params.CommonCfg.StorageScheme.SwapTempValue("file")
tmpDir := s.T().TempDir()
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(arrowSchema, &schema.SchemaOptions{
PrimaryColumn: "pk", VectorColumn: "vector", VersionColumn: common.TimeStampFieldName,
})).Build())
s.Require().NoError(err)
s.storageV2Cache.SetSpace(1000, space)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
}
func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
tmpDir := s.T().TempDir()
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(arrowSchema, &schema.SchemaOptions{
PrimaryColumn: "pk", VectorColumn: "vector", VersionColumn: common.TimeStampFieldName,
})).Build())
s.Require().NoError(err)
s.storageV2Cache.SetSpace(1002, space)
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetSealedSegmentsPolicy(s.metacacheInt64),
},
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet())
segCompacted := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 0)
})
}
func (s *BFWriteBufferSuite) TestCreateFailure() {
metacache := metacache.NewMockMetaCache(s.T())
metacache.EXPECT().Collection().Return(s.collID)
metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{})
_, err := NewBFWriteBuffer(s.channelName, metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{})
s.Error(err)
}
func TestBFWriteBuffer(t *testing.T) {
suite.Run(t, new(BFWriteBufferSuite))
}

View File

@ -9,18 +9,9 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
const (
// DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy
DeletePolicyBFPkOracle = `bloom_filter_pkoracle`
// DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy.
DeletePolicyL0Delta = `l0_delta`
)
type WriteBufferOption func(opt *writeBufferOption)
type writeBufferOption struct {
deletePolicy string
idAllocator allocator.Interface
syncPolicies []SyncPolicy
@ -29,14 +20,7 @@ type writeBufferOption struct {
}
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
deletePolicy := DeletePolicyBFPkOracle
if paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
deletePolicy = DeletePolicyL0Delta
}
return &writeBufferOption{
// TODO use l0 delta as default after implementation.
deletePolicy: deletePolicy,
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
@ -46,12 +30,6 @@ func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
}
}
func WithDeletePolicy(policy string) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.deletePolicy = policy
}
}
func WithIDAllocator(allocator allocator.Interface) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.idAllocator = allocator

View File

@ -106,14 +106,7 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach
opt(option)
}
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
}
// writeBufferBase is the common component for buffering data

View File

@ -62,42 +62,6 @@ func (s *WriteBufferSuite) SetupTest() {
s.Require().NoError(err)
}
func (s *WriteBufferSuite) TestDefaultOption() {
s.Run("default BFPkOracle", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key)
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr)
s.NoError(err)
_, ok := wb.(*bfWriteBuffer)
s.True(ok)
})
s.Run("default L0Delta policy", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key)
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator()))
s.NoError(err)
_, ok := wb.(*l0WriteBuffer)
s.True(ok)
})
}
func (s *WriteBufferSuite) TestWriteBufferType() {
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
_, ok := wb.(*bfWriteBuffer)
s.True(ok)
wb, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator()))
s.NoError(err)
_, ok = wb.(*l0WriteBuffer)
s.True(ok)
_, err = NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(""))
s.Error(err)
}
func (s *WriteBufferSuite) TestHasSegment() {
segmentID := int64(1001)
@ -114,7 +78,7 @@ func (s *WriteBufferSuite) TestFlushSegments() {
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, true)
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithIDAllocator(allocator.NewMockAllocator(s.T())))
s.NoError(err)
err = wb.SealSegments(context.Background(), []int64{segmentID})

View File

@ -3184,7 +3184,6 @@ type dataCoordConfig struct {
ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"`
// LevelZero Segment
EnableLevelZeroSegment ParamItem `refreshable:"false"`
LevelZeroCompactionTriggerMinSize ParamItem `refreshable:"true"`
LevelZeroCompactionTriggerMaxSize ParamItem `refreshable:"true"`
LevelZeroCompactionTriggerDeltalogMinNum ParamItem `refreshable:"true"`
@ -3566,15 +3565,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.SyncSegmentsInterval.Init(base.mgr)
// LevelZeroCompaction
p.EnableLevelZeroSegment = ParamItem{
Key: "dataCoord.segment.enableLevelZero",
Version: "2.4.0",
Doc: "Whether to enable LevelZeroCompaction",
DefaultValue: "true",
}
p.EnableLevelZeroSegment.Init(base.mgr)
p.LevelZeroCompactionTriggerMinSize = ParamItem{
Key: "dataCoord.compaction.levelzero.forceTrigger.minSize",
Version: "2.4.0",