From 5038036ece044fe605de9323b0c9967cb4f20c57 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 7 May 2024 21:13:47 +0800 Subject: [PATCH] enhance: Reuse hash locations during access bloom fitler (#32642) issue: #32530 when try to match segment bloom filter with pk, we can reuse the hash locations. This PR maintain the max hash Func, and compute hash location once for all segment, reuse hash location can speed up bf access --------- Signed-off-by: Wei Liu --- .../querynodev2/delegator/delegator_data.go | 5 +- .../delegator/delegator_data_test.go | 8 ++ .../querynodev2/delegator/delegator_test.go | 4 + .../{bloom_filter.go => bloom_filter_set.go} | 68 +++++++--- .../pkoracle/bloom_filter_set_test.go | 119 ++++++++++++++++++ internal/querynodev2/pkoracle/candidate.go | 2 + internal/querynodev2/pkoracle/key.go | 9 ++ internal/querynodev2/pkoracle/pk_oracle.go | 39 +++++- .../querynodev2/pkoracle/pk_oracle_test.go | 65 ++++++++++ internal/querynodev2/segments/mock_segment.go | 84 +++++++++++++ internal/querynodev2/segments/segment.go | 8 ++ .../querynodev2/segments/segment_interface.go | 2 + .../querynodev2/segments/segment_loader.go | 2 +- internal/querynodev2/segments/segment_test.go | 8 ++ internal/storage/pk_statistics.go | 32 +++++ 15 files changed, 437 insertions(+), 18 deletions(-) rename internal/querynodev2/pkoracle/{bloom_filter.go => bloom_filter_set.go} (70%) create mode 100644 internal/querynodev2/pkoracle/bloom_filter_set_test.go create mode 100644 internal/querynodev2/pkoracle/pk_oracle_test.go diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 420e4ba846..be4870a34c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -283,12 +283,13 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker var futures []*conc.Future[struct{}] for _, segmentEntry := range entries { + segmentEntry := segmentEntry + delRecord, ok := delRecords[segmentEntry.SegmentID] log := log.With( zap.Int64("segmentID", segmentEntry.SegmentID), zap.Int64("workerID", nodeID), + zap.Int("forwardRowCount", len(delRecord.PrimaryKeys)), ) - segmentEntry := segmentEntry - delRecord, ok := delRecords[segmentEntry.SegmentID] if ok { future := pool.Submit(func() (struct{}, error) { log.Debug("delegator plan to applyDelete via worker") diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 904ab58b84..6d2ae22411 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -261,6 +261,10 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) @@ -876,6 +880,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 2dcd9ac5e0..4d51b1145d 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -99,6 +99,10 @@ func (s *DelegatorSuite) SetupTest() { ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) diff --git a/internal/querynodev2/pkoracle/bloom_filter.go b/internal/querynodev2/pkoracle/bloom_filter_set.go similarity index 70% rename from internal/querynodev2/pkoracle/bloom_filter.go rename to internal/querynodev2/pkoracle/bloom_filter_set.go index 6ffb913f5a..608bb656ef 100644 --- a/internal/querynodev2/pkoracle/bloom_filter.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -17,6 +17,7 @@ package pkoracle import ( + "context" "sync" bloom "github.com/bits-and-blooms/bloom/v3" @@ -40,6 +41,8 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics + + kHashFunc uint } // MayPkExist returns whether any bloom filters returns positive. @@ -59,6 +62,47 @@ func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { return false } +func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { + log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60) + s.statsMutex.RLock() + defer s.statsMutex.RUnlock() + + if s.currentStat != nil { + k := s.currentStat.PkFilter.K() + if k > uint(len(locs)) { + log.RatedWarn(30, "locations num is less than hash func num, return false positive result", + zap.Int("locationNum", len(locs)), + zap.Uint("hashFuncNum", k), + zap.Int64("segmentID", s.segmentID)) + return true + } + + if s.currentStat.TestLocations(pk, locs[:k]) { + return true + } + } + + // for sealed, if one of the stats shows it exist, then we have to check it + for _, historyStat := range s.historyStats { + k := historyStat.PkFilter.K() + if k > uint(len(locs)) { + log.RatedWarn(30, "locations num is less than hash func num, return false positive result", + zap.Int("locationNum", len(locs)), + zap.Uint("hashFuncNum", k), + zap.Int64("segmentID", s.segmentID)) + return true + } + if historyStat.TestLocations(pk, locs[:k]) { + return true + } + } + return false +} + +func (s *BloomFilterSet) GetHashFuncNum() uint { + return s.kHashFunc +} + // ID implement candidate. func (s *BloomFilterSet) ID() int64 { return s.segmentID @@ -80,17 +124,21 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { + m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + if k > s.kHashFunc { + s.kHashFunc = k + } s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloom.New(m, k), } } - buf := make([]byte, 8) for _, pk := range pks { s.currentStat.UpdateMinMax(pk) switch pk.Type() { case schemapb.DataType_Int64: + buf := make([]byte, 8) int64Value := pk.(*storage.Int64PrimaryKey).Value common.Endian.PutUint64(buf, uint64(int64Value)) s.currentStat.PkFilter.Add(buf) @@ -109,18 +157,10 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.statsMutex.Lock() defer s.statsMutex.Unlock() - s.historyStats = append(s.historyStats, stats) -} - -// initCurrentStat initialize currentStats if nil. -// Note: invoker shall acquire statsMutex lock first. -func (s *BloomFilterSet) initCurrentStat() { - if s.currentStat == nil { - s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } + if stats.PkFilter.K() > s.kHashFunc { + s.kHashFunc = stats.PkFilter.K() } + s.historyStats = append(s.historyStats, stats) } // NewBloomFilterSet returns a new BloomFilterSet. diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go new file mode 100644 index 0000000000..0384d3faa7 --- /dev/null +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -0,0 +1,119 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pkoracle + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestInt64Pk(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } + + assert.Equal(t, int64(1), bfs.ID()) + assert.Equal(t, int64(1), bfs.Partition()) + assert.Equal(t, commonpb.SegmentState_Sealed, bfs.Type()) +} + +func TestVarCharPk(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } +} + +func TestHistoricalStat(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + // mock historical bf + bfs.AddHistoricalStats(bfs.currentStat) + bfs.AddHistoricalStats(bfs.currentStat) + bfs.currentStat = nil + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } +} + +func TestHashFuncNum(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + // pass locations more then hash func num in bf + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()+3) + ret1 := bfs.TestLocations(pks[i], locations) + assert.True(t, ret1) + } +} diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index 037963966f..e5f051e5f1 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -27,6 +27,8 @@ import ( type Candidate interface { // MayPkExist checks whether primary key could exists in this candidate. MayPkExist(pk storage.PrimaryKey) bool + TestLocations(pk storage.PrimaryKey, locs []uint64) bool + GetHashFuncNum() uint ID() int64 Partition() int64 diff --git a/internal/querynodev2/pkoracle/key.go b/internal/querynodev2/pkoracle/key.go index 07a001568b..9845b5e065 100644 --- a/internal/querynodev2/pkoracle/key.go +++ b/internal/querynodev2/pkoracle/key.go @@ -33,6 +33,15 @@ func (k candidateKey) MayPkExist(pk storage.PrimaryKey) bool { return true } +func (k candidateKey) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { + // always return true to prevent miuse + return true +} + +func (k candidateKey) GetHashFuncNum() uint { + return 0 +} + // ID implements Candidate. func (k candidateKey) ID() int64 { return k.segmentID diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index b509fc5e2d..4d686503ec 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -19,8 +19,10 @@ package pkoracle import ( "fmt" + "sync" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -41,18 +43,46 @@ var _ PkOracle = (*pkOracle)(nil) // pkOracle implementation. type pkOracle struct { candidates *typeutil.ConcurrentMap[string, candidateWithWorker] + + hashFuncNumMutex sync.RWMutex + maxHashFuncNum uint +} + +func (pko *pkOracle) GetMaxHashFuncNum() uint { + pko.hashFuncNumMutex.RLock() + defer pko.hashFuncNumMutex.RUnlock() + return pko.maxHashFuncNum +} + +func (pko *pkOracle) TryUpdateHashFuncNum(newValue uint) { + pko.hashFuncNumMutex.Lock() + defer pko.hashFuncNumMutex.Unlock() + if newValue > pko.maxHashFuncNum { + pko.maxHashFuncNum = newValue + } } // Get implements PkOracle. func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { var result []int64 + var locations []uint64 + pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { return true } } - if candidate.MayPkExist(pk) { + + if locations == nil { + locations = storage.Locations(pk, pko.GetMaxHashFuncNum()) + if len(locations) == 0 { + log.Warn("pkOracle: no location found for pk") + return true + } + } + + if candidate.TestLocations(pk, locations) { result = append(result, candidate.ID()) } return true @@ -67,6 +97,7 @@ func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { // Register register candidate func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { + pko.TryUpdateHashFuncNum(candidate.GetHashFuncNum()) pko.candidates.Insert(pko.candidateKey(candidate, workerID), candidateWithWorker{ Candidate: candidate, workerID: workerID, @@ -77,6 +108,7 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { // Remove removes candidate from pko. func (pko *pkOracle) Remove(filters ...CandidateFilter) error { + max := uint(0) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -84,9 +116,14 @@ func (pko *pkOracle) Remove(filters ...CandidateFilter) error { } } pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)) + if candidate.GetHashFuncNum() > max { + max = candidate.GetHashFuncNum() + } return true }) + + pko.TryUpdateHashFuncNum(max) return nil } diff --git a/internal/querynodev2/pkoracle/pk_oracle_test.go b/internal/querynodev2/pkoracle/pk_oracle_test.go new file mode 100644 index 0000000000..ec19fd4c35 --- /dev/null +++ b/internal/querynodev2/pkoracle/pk_oracle_test.go @@ -0,0 +1,65 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pkoracle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestGet(t *testing.T) { + paramtable.Init() + pko := NewPkOracle() + + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + pko.Register(bfs, 1) + + ret := pko.Exists(bfs, 1) + assert.True(t, ret) + + ret = pko.Exists(bfs, 2) + assert.False(t, ret) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + segmentIDs, ok := pko.Get(pk) + assert.Nil(t, ok) + assert.Contains(t, segmentIDs, int64(1)) + } + + pko.Remove(WithSegmentIDs(1)) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + segmentIDs, ok := pko.Get(pk) + assert.Nil(t, ok) + assert.NotContains(t, segmentIDs, int64(1)) + } +} diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index d2cfb26bfc..3121d0ca45 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -246,6 +246,47 @@ func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockS return _c } +// GetHashFuncNum provides a mock function with given fields: +func (_m *MockSegment) GetHashFuncNum() uint { + ret := _m.Called() + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// MockSegment_GetHashFuncNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHashFuncNum' +type MockSegment_GetHashFuncNum_Call struct { + *mock.Call +} + +// GetHashFuncNum is a helper method to define mock.On call +func (_e *MockSegment_Expecter) GetHashFuncNum() *MockSegment_GetHashFuncNum_Call { + return &MockSegment_GetHashFuncNum_Call{Call: _e.mock.On("GetHashFuncNum")} +} + +func (_c *MockSegment_GetHashFuncNum_Call) Run(run func()) *MockSegment_GetHashFuncNum_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_GetHashFuncNum_Call) Return(_a0 uint) *MockSegment_GetHashFuncNum_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_GetHashFuncNum_Call) RunAndReturn(run func() uint) *MockSegment_GetHashFuncNum_Call { + _c.Call.Return(run) + return _c +} + // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) @@ -1412,6 +1453,49 @@ func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosi return _c } +// TestLocations provides a mock function with given fields: pk, loc +func (_m *MockSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { + ret := _m.Called(pk, loc) + + var r0 bool + if rf, ok := ret.Get(0).(func(storage.PrimaryKey, []uint64) bool); ok { + r0 = rf(pk, loc) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSegment_TestLocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TestLocations' +type MockSegment_TestLocations_Call struct { + *mock.Call +} + +// TestLocations is a helper method to define mock.On call +// - pk storage.PrimaryKey +// - loc []uint64 +func (_e *MockSegment_Expecter) TestLocations(pk interface{}, loc interface{}) *MockSegment_TestLocations_Call { + return &MockSegment_TestLocations_Call{Call: _e.mock.On("TestLocations", pk, loc)} +} + +func (_c *MockSegment_TestLocations_Call) Run(run func(pk storage.PrimaryKey, loc []uint64)) *MockSegment_TestLocations_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(storage.PrimaryKey), args[1].([]uint64)) + }) + return _c +} + +func (_c *MockSegment_TestLocations_Call) Return(_a0 bool) *MockSegment_TestLocations_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_TestLocations_Call) RunAndReturn(run func(storage.PrimaryKey, []uint64) bool) *MockSegment_TestLocations_Call { + _c.Call.Return(run) + return _c +} + // Type provides a mock function with given fields: func (_m *MockSegment) Type() commonpb.SegmentState { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 976fe2972d..2acb841c44 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -187,6 +187,14 @@ func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool { return s.bloomFilterSet.MayPkExist(pk) } +func (s *baseSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { + return s.bloomFilterSet.TestLocations(pk, loc) +} + +func (s *baseSegment) GetHashFuncNum() uint { + return s.bloomFilterSet.GetHashFuncNum() +} + // ResourceUsageEstimate returns the estimated resource usage of the segment. func (s *baseSegment) ResourceUsageEstimate() ResourceUsage { if s.segmentType == SegmentTypeGrowing { diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index ee0b1ed112..9ed9d4df90 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -84,6 +84,8 @@ type Segment interface { // Bloom filter related UpdateBloomFilter(pks []storage.PrimaryKey) MayPkExist(pk storage.PrimaryKey) bool + TestLocations(pk storage.PrimaryKey, loc []uint64) bool + GetHashFuncNum() uint // Read operations Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6339a2db0f..a9b077d7e5 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -372,7 +372,7 @@ func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID in size += stat.BF.Cap() bfs.AddHistoricalStats(pkStat) } - log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size)) + log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size), zap.Int("BFNum", len(stats))) return nil } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 5fc39afc73..0d278a9ccd 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -187,6 +187,14 @@ func (suite *SegmentSuite) TestHasRawData() { suite.True(has) } +func (suite *SegmentSuite) TestLocation() { + pk := storage.NewInt64PrimaryKey(100) + locations := storage.Locations(pk, suite.sealed.GetHashFuncNum()) + ret1 := suite.sealed.TestLocations(pk, locations) + ret2 := suite.sealed.MayPkExist(pk) + suite.Equal(ret1, ret2) +} + func (suite *SegmentSuite) TestCASVersion() { segment := suite.sealed diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index 2278ee22b7..742e59d069 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -107,3 +107,35 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool { // no idea, just make it as false positive return true } + +// Locations returns a list of hash locations representing a data item. +func Locations(pk PrimaryKey, k uint) []uint64 { + switch pk.Type() { + case schemapb.DataType_Int64: + buf := make([]byte, 8) + int64Pk := pk.(*Int64PrimaryKey) + common.Endian.PutUint64(buf, uint64(int64Pk.Value)) + return bloom.Locations(buf, k) + case schemapb.DataType_VarChar: + varCharPk := pk.(*VarCharPrimaryKey) + return bloom.Locations([]byte(varCharPk.Value), k) + default: + // TODO:: + } + return nil +} + +func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool { + // empty pkStatics + if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { + return false + } + + // check bf first, TestLocation just do some bitset compute, cost is cheaper + if !st.PkFilter.TestLocations(locs) { + return false + } + + // check pk range first, ugly but key it for now + return st.MinPK.LE(pk) && st.MaxPK.GE(pk) +}