mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
enhance: Unify levelzero segment config in DN (#28720)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
881a166b5a
commit
606ec77b66
@ -421,6 +421,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
||||
zap.Int64("nodeID", req.GetBase().GetSourceID()),
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.Int64("segmentID", req.GetSegmentID()),
|
||||
zap.String("level", req.GetSegLevel().String()),
|
||||
)
|
||||
|
||||
log.Info("receive SaveBinlogPaths request",
|
||||
|
||||
@ -29,9 +29,14 @@ type writeBufferOption struct {
|
||||
}
|
||||
|
||||
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
|
||||
deletePolicy := DeletePolicyBFPkOracle
|
||||
if paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
|
||||
deletePolicy = DeletePolicyL0Delta
|
||||
}
|
||||
|
||||
return &writeBufferOption{
|
||||
// TODO use l0 delta as default after implementation.
|
||||
deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(),
|
||||
deletePolicy: deletePolicy,
|
||||
syncPolicies: []SyncPolicy{
|
||||
GetFullBufferPolicy(),
|
||||
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
|
||||
|
||||
@ -54,6 +54,24 @@ func (s *WriteBufferSuite) SetupTest() {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestDefaulOption() {
|
||||
s.Run("default BFPkOracle", func() {
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr)
|
||||
s.NoError(err)
|
||||
_, ok := wb.(*bfWriteBuffer)
|
||||
s.True(ok)
|
||||
})
|
||||
|
||||
s.Run("default L0Delta policy", func() {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator()))
|
||||
s.NoError(err)
|
||||
_, ok := wb.(*l0WriteBuffer)
|
||||
s.True(ok)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestWriteBufferType() {
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
|
||||
s.NoError(err)
|
||||
|
||||
@ -2523,7 +2523,6 @@ type dataNodeConfig struct {
|
||||
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
|
||||
BinLogMaxSize ParamItem `refreshable:"true"`
|
||||
SyncPeriod ParamItem `refreshable:"true"`
|
||||
DeltaPolicy ParamItem `refreshable:"false"`
|
||||
|
||||
// watchEvent
|
||||
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
||||
@ -2653,15 +2652,6 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
||||
}
|
||||
p.SyncPeriod.Init(base.mgr)
|
||||
|
||||
p.DeltaPolicy = ParamItem{
|
||||
Key: "dataNode.segment.deltaPolicy",
|
||||
Version: "2.3.4",
|
||||
DefaultValue: "bloom_filter_pkoracle",
|
||||
Doc: "the delta policy current datanode using",
|
||||
Export: true,
|
||||
}
|
||||
p.DeltaPolicy.Init(base.mgr)
|
||||
|
||||
p.WatchEventTicklerInterval = ParamItem{
|
||||
Key: "datanode.segment.watchEventTicklerInterval",
|
||||
Version: "2.2.3",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user