diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3c997efd4e..a7a982540b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -581,6 +581,8 @@ common: warn: 1000 # minimum milliseconds for printing durations in warn level ttMsgEnabled: true # Whether the instance disable sending ts messages traceLogMode: 0 # trace request info, 0: none, 1: simple request info, like collection/partition/database name, 2: request detail + bloomFilterSize: 100000 + maxBloomFalsePositive: 0.05 # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 7e9bbcb7b0..3251c53f81 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -23,6 +23,7 @@ import ( "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type BloomFilterSet struct { @@ -58,7 +59,8 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { if bfs.current == nil { bfs.current = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), + PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } } diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index 4d647b6910..fc1148a594 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type BloomFilterSetSuite struct { @@ -31,6 +32,7 @@ type BloomFilterSetSuite struct { } func (s *BloomFilterSetSuite) SetupTest() { + paramtable.Init() s.bfs = NewBloomFilterSet() } @@ -56,7 +58,6 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData { func (s *BloomFilterSetSuite) TestWriteRead() { ids := []int64{1, 2, 3, 4, 5} - for _, id := range ids { s.False(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall not exist before update") } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index f44871c826..ae942b3f75 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -246,7 +246,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive) + bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -428,7 +429,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive) + bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -583,7 +585,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive) + bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -779,7 +782,8 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive) + bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) pks := &storage.PkStatistics{ PkFilter: bf, } diff --git a/internal/querynodev2/pkoracle/bloom_filter.go b/internal/querynodev2/pkoracle/bloom_filter.go index b16b787754..6ffb913f5a 100644 --- a/internal/querynodev2/pkoracle/bloom_filter.go +++ b/internal/querynodev2/pkoracle/bloom_filter.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) var _ Candidate = (*BloomFilterSet)(nil) @@ -80,7 +81,8 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { if s.currentStat == nil { s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), + PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } } @@ -115,7 +117,8 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { func (s *BloomFilterSet) initCurrentStat() { if s.currentStat == nil { s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), + PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } } } diff --git a/internal/querynodev2/segments/bloom_filter_set.go b/internal/querynodev2/segments/bloom_filter_set.go index 794f412b76..b07713961c 100644 --- a/internal/querynodev2/segments/bloom_filter_set.go +++ b/internal/querynodev2/segments/bloom_filter_set.go @@ -26,6 +26,7 @@ import ( storage "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type bloomFilterSet struct { @@ -94,6 +95,7 @@ func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { // Note: invoker shall acquire statsMutex lock first. func (s *bloomFilterSet) initCurrentStat() { s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), + PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } } diff --git a/internal/querynodev2/segments/bloom_filter_set_test.go b/internal/querynodev2/segments/bloom_filter_set_test.go index a427737b4d..9bf95a1ff9 100644 --- a/internal/querynodev2/segments/bloom_filter_set_test.go +++ b/internal/querynodev2/segments/bloom_filter_set_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type BloomFilterSetSuite struct { @@ -35,6 +36,7 @@ type BloomFilterSetSuite struct { func (suite *BloomFilterSetSuite) SetupTest() { suite.intPks = []int64{1, 2, 3} suite.stringPks = []string{"1", "2", "3"} + paramtable.Init() suite.set = newBloomFilterSet() } diff --git a/internal/storage/stats.go b/internal/storage/stats.go index f4792754e3..b5ce61740e 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -26,12 +26,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" -) - -const ( - // TODO silverxia maybe need set from config - BloomFilterSize uint = 100000 - MaxBloomFalsePositive float64 = 0.005 + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // PrimaryKeyStats contains statistics data for pk column @@ -197,7 +192,7 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) return &PrimaryKeyStats{ FieldID: fieldID, PkType: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive), + BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), }, nil } @@ -236,7 +231,7 @@ func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, m stats := &PrimaryKeyStats{ FieldID: fieldID, PkType: int64(pkType), - BF: bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive), + BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } stats.UpdateByMsgs(msgs) diff --git a/pkg/config/config.go b/pkg/config/config.go index 4e81220790..320261d3cd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -52,9 +52,5 @@ func Init(opts ...Option) (*Manager, error) { } func formatKey(key string) string { - ret := strings.ToLower(key) - ret = strings.ReplaceAll(ret, "/", "") - ret = strings.ReplaceAll(ret, "_", "") - ret = strings.ReplaceAll(ret, ".", "") - return ret + return strings.NewReplacer("/", "", "_", "", ".", "").Replace(strings.ToLower(key)) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d5738e60f6..079f350d41 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -230,6 +230,9 @@ type commonConfig struct { EnableStorageV2 ParamItem `refreshable:"false"` TTMsgEnabled ParamItem `refreshable:"true"` TraceLogMode ParamItem `refreshable:"true"` + + BloomFilterSize ParamItem `refreshable:"true"` + MaxBloomFalsePositive ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -672,6 +675,22 @@ like the old password verification when updating the credential`, Doc: "trace request info", } p.TraceLogMode.Init(base.mgr) + + p.BloomFilterSize = ParamItem{ + Key: "common.bloomFilterSize", + Version: "2.3.2", + DefaultValue: "100000", + Doc: "bloom filter initial size", + } + p.BloomFilterSize.Init(base.mgr) + + p.MaxBloomFalsePositive = ParamItem{ + Key: "common.maxBloomFalsePositive", + Version: "2.3.2", + DefaultValue: "0.05", + Doc: "max false positive rate for bloom filter", + } + p.MaxBloomFalsePositive.Init(base.mgr) } type gpuConfig struct {