mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
This PR cherry-picks the following commits: - Implement task limit control logic in datanode. https://github.com/milvus-io/milvus/pull/32881 - Load bf from storage instead of memory during L0 compaction. https://github.com/milvus-io/milvus/pull/32913 - Remove dependencies on shards (e.g. SyncSegments, injection). https://github.com/milvus-io/milvus/pull/33138 - Rename Compaction interface to CompactionV2. https://github.com/milvus-io/milvus/pull/33858 - Remove the unused residual compaction logic. https://github.com/milvus-io/milvus/pull/33932 issue: https://github.com/milvus-io/milvus/issues/32809 pr: https://github.com/milvus-io/milvus/pull/32881, https://github.com/milvus-io/milvus/pull/32913, https://github.com/milvus-io/milvus/pull/33138, https://github.com/milvus-io/milvus/pull/33858, https://github.com/milvus-io/milvus/pull/33932 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
78 lines
2.0 KiB
Go
78 lines
2.0 KiB
Go
package writebuffer
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/milvus-io/milvus/internal/allocator"
|
|
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
|
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
const (
|
|
// DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy
|
|
DeletePolicyBFPkOracle = `bloom_filter_pkoracle`
|
|
|
|
// DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy.
|
|
DeletePolicyL0Delta = `l0_delta`
|
|
)
|
|
|
|
type WriteBufferOption func(opt *writeBufferOption)
|
|
|
|
type writeBufferOption struct {
|
|
deletePolicy string
|
|
idAllocator allocator.Interface
|
|
syncPolicies []SyncPolicy
|
|
|
|
pkStatsFactory metacache.PkStatsFactory
|
|
metaWriter syncmgr.MetaWriter
|
|
}
|
|
|
|
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
|
|
deletePolicy := DeletePolicyBFPkOracle
|
|
if paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
|
|
deletePolicy = DeletePolicyL0Delta
|
|
}
|
|
|
|
return &writeBufferOption{
|
|
// TODO use l0 delta as default after implementation.
|
|
deletePolicy: deletePolicy,
|
|
syncPolicies: []SyncPolicy{
|
|
GetFullBufferPolicy(),
|
|
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
|
|
GetSealedSegmentsPolicy(metacache),
|
|
GetDroppedSegmentPolicy(metacache),
|
|
},
|
|
}
|
|
}
|
|
|
|
func WithDeletePolicy(policy string) WriteBufferOption {
|
|
return func(opt *writeBufferOption) {
|
|
opt.deletePolicy = policy
|
|
}
|
|
}
|
|
|
|
func WithIDAllocator(allocator allocator.Interface) WriteBufferOption {
|
|
return func(opt *writeBufferOption) {
|
|
opt.idAllocator = allocator
|
|
}
|
|
}
|
|
|
|
func WithPKStatsFactory(factory metacache.PkStatsFactory) WriteBufferOption {
|
|
return func(opt *writeBufferOption) {
|
|
opt.pkStatsFactory = factory
|
|
}
|
|
}
|
|
|
|
func WithMetaWriter(writer syncmgr.MetaWriter) WriteBufferOption {
|
|
return func(opt *writeBufferOption) {
|
|
opt.metaWriter = writer
|
|
}
|
|
}
|
|
|
|
func WithSyncPolicy(policy SyncPolicy) WriteBufferOption {
|
|
return func(opt *writeBufferOption) {
|
|
opt.syncPolicies = append(opt.syncPolicies, policy)
|
|
}
|
|
}
|