diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 420e4ba846..be4870a34c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -283,12 +283,13 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker var futures []*conc.Future[struct{}] for _, segmentEntry := range entries { + segmentEntry := segmentEntry + delRecord, ok := delRecords[segmentEntry.SegmentID] log := log.With( zap.Int64("segmentID", segmentEntry.SegmentID), zap.Int64("workerID", nodeID), + zap.Int("forwardRowCount", len(delRecord.PrimaryKeys)), ) - segmentEntry := segmentEntry - delRecord, ok := delRecords[segmentEntry.SegmentID] if ok { future := pool.Submit(func() (struct{}, error) { log.Debug("delegator plan to applyDelete via worker") diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 904ab58b84..6d2ae22411 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -261,6 +261,10 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) @@ -876,6 +880,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 2dcd9ac5e0..4d51b1145d 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -99,6 +99,10 @@ func (s *DelegatorSuite) SetupTest() { ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) + ms.EXPECT().GetHashFuncNum().Return(1) + ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { + return pk.EQ(storage.NewInt64PrimaryKey(10)) + }) return ms }) }, nil) diff --git a/internal/querynodev2/pkoracle/bloom_filter.go b/internal/querynodev2/pkoracle/bloom_filter_set.go similarity index 70% rename from internal/querynodev2/pkoracle/bloom_filter.go rename to internal/querynodev2/pkoracle/bloom_filter_set.go index 6ffb913f5a..608bb656ef 100644 --- a/internal/querynodev2/pkoracle/bloom_filter.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -17,6 +17,7 @@ package pkoracle import ( + "context" "sync" bloom "github.com/bits-and-blooms/bloom/v3" @@ -40,6 +41,8 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics + + kHashFunc uint } // MayPkExist returns whether any bloom filters returns positive. @@ -59,6 +62,47 @@ func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { return false } +func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { + log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60) + s.statsMutex.RLock() + defer s.statsMutex.RUnlock() + + if s.currentStat != nil { + k := s.currentStat.PkFilter.K() + if k > uint(len(locs)) { + log.RatedWarn(30, "locations num is less than hash func num, return false positive result", + zap.Int("locationNum", len(locs)), + zap.Uint("hashFuncNum", k), + zap.Int64("segmentID", s.segmentID)) + return true + } + + if s.currentStat.TestLocations(pk, locs[:k]) { + return true + } + } + + // for sealed, if one of the stats shows it exist, then we have to check it + for _, historyStat := range s.historyStats { + k := historyStat.PkFilter.K() + if k > uint(len(locs)) { + log.RatedWarn(30, "locations num is less than hash func num, return false positive result", + zap.Int("locationNum", len(locs)), + zap.Uint("hashFuncNum", k), + zap.Int64("segmentID", s.segmentID)) + return true + } + if historyStat.TestLocations(pk, locs[:k]) { + return true + } + } + return false +} + +func (s *BloomFilterSet) GetHashFuncNum() uint { + return s.kHashFunc +} + // ID implement candidate. func (s *BloomFilterSet) ID() int64 { return s.segmentID @@ -80,17 +124,21 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { + m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + if k > s.kHashFunc { + s.kHashFunc = k + } s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloom.New(m, k), } } - buf := make([]byte, 8) for _, pk := range pks { s.currentStat.UpdateMinMax(pk) switch pk.Type() { case schemapb.DataType_Int64: + buf := make([]byte, 8) int64Value := pk.(*storage.Int64PrimaryKey).Value common.Endian.PutUint64(buf, uint64(int64Value)) s.currentStat.PkFilter.Add(buf) @@ -109,18 +157,10 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.statsMutex.Lock() defer s.statsMutex.Unlock() - s.historyStats = append(s.historyStats, stats) -} - -// initCurrentStat initialize currentStats if nil. -// Note: invoker shall acquire statsMutex lock first. -func (s *BloomFilterSet) initCurrentStat() { - if s.currentStat == nil { - s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } + if stats.PkFilter.K() > s.kHashFunc { + s.kHashFunc = stats.PkFilter.K() } + s.historyStats = append(s.historyStats, stats) } // NewBloomFilterSet returns a new BloomFilterSet. diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go new file mode 100644 index 0000000000..0384d3faa7 --- /dev/null +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -0,0 +1,119 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pkoracle + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestInt64Pk(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } + + assert.Equal(t, int64(1), bfs.ID()) + assert.Equal(t, int64(1), bfs.Partition()) + assert.Equal(t, commonpb.SegmentState_Sealed, bfs.Type()) +} + +func TestVarCharPk(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } +} + +func TestHistoricalStat(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + // mock historical bf + bfs.AddHistoricalStats(bfs.currentStat) + bfs.AddHistoricalStats(bfs.currentStat) + bfs.currentStat = nil + + for i := 0; i < batchSize; i++ { + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) + ret1 := bfs.TestLocations(pks[i], locations) + ret2 := bfs.MayPkExist(pks[i]) + assert.Equal(t, ret1, ret2) + } +} + +func TestHashFuncNum(t *testing.T) { + paramtable.Init() + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + for i := 0; i < batchSize; i++ { + // pass locations more then hash func num in bf + locations := storage.Locations(pks[i], bfs.GetHashFuncNum()+3) + ret1 := bfs.TestLocations(pks[i], locations) + assert.True(t, ret1) + } +} diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index 037963966f..e5f051e5f1 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -27,6 +27,8 @@ import ( type Candidate interface { // MayPkExist checks whether primary key could exists in this candidate. MayPkExist(pk storage.PrimaryKey) bool + TestLocations(pk storage.PrimaryKey, locs []uint64) bool + GetHashFuncNum() uint ID() int64 Partition() int64 diff --git a/internal/querynodev2/pkoracle/key.go b/internal/querynodev2/pkoracle/key.go index 07a001568b..9845b5e065 100644 --- a/internal/querynodev2/pkoracle/key.go +++ b/internal/querynodev2/pkoracle/key.go @@ -33,6 +33,15 @@ func (k candidateKey) MayPkExist(pk storage.PrimaryKey) bool { return true } +func (k candidateKey) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { + // always return true to prevent miuse + return true +} + +func (k candidateKey) GetHashFuncNum() uint { + return 0 +} + // ID implements Candidate. func (k candidateKey) ID() int64 { return k.segmentID diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index b509fc5e2d..4d686503ec 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -19,8 +19,10 @@ package pkoracle import ( "fmt" + "sync" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -41,18 +43,46 @@ var _ PkOracle = (*pkOracle)(nil) // pkOracle implementation. type pkOracle struct { candidates *typeutil.ConcurrentMap[string, candidateWithWorker] + + hashFuncNumMutex sync.RWMutex + maxHashFuncNum uint +} + +func (pko *pkOracle) GetMaxHashFuncNum() uint { + pko.hashFuncNumMutex.RLock() + defer pko.hashFuncNumMutex.RUnlock() + return pko.maxHashFuncNum +} + +func (pko *pkOracle) TryUpdateHashFuncNum(newValue uint) { + pko.hashFuncNumMutex.Lock() + defer pko.hashFuncNumMutex.Unlock() + if newValue > pko.maxHashFuncNum { + pko.maxHashFuncNum = newValue + } } // Get implements PkOracle. func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { var result []int64 + var locations []uint64 + pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { return true } } - if candidate.MayPkExist(pk) { + + if locations == nil { + locations = storage.Locations(pk, pko.GetMaxHashFuncNum()) + if len(locations) == 0 { + log.Warn("pkOracle: no location found for pk") + return true + } + } + + if candidate.TestLocations(pk, locations) { result = append(result, candidate.ID()) } return true @@ -67,6 +97,7 @@ func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { // Register register candidate func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { + pko.TryUpdateHashFuncNum(candidate.GetHashFuncNum()) pko.candidates.Insert(pko.candidateKey(candidate, workerID), candidateWithWorker{ Candidate: candidate, workerID: workerID, @@ -77,6 +108,7 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { // Remove removes candidate from pko. func (pko *pkOracle) Remove(filters ...CandidateFilter) error { + max := uint(0) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -84,9 +116,14 @@ func (pko *pkOracle) Remove(filters ...CandidateFilter) error { } } pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)) + if candidate.GetHashFuncNum() > max { + max = candidate.GetHashFuncNum() + } return true }) + + pko.TryUpdateHashFuncNum(max) return nil } diff --git a/internal/querynodev2/pkoracle/pk_oracle_test.go b/internal/querynodev2/pkoracle/pk_oracle_test.go new file mode 100644 index 0000000000..ec19fd4c35 --- /dev/null +++ b/internal/querynodev2/pkoracle/pk_oracle_test.go @@ -0,0 +1,65 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pkoracle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestGet(t *testing.T) { + paramtable.Init() + pko := NewPkOracle() + + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + pko.Register(bfs, 1) + + ret := pko.Exists(bfs, 1) + assert.True(t, ret) + + ret = pko.Exists(bfs, 2) + assert.False(t, ret) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + segmentIDs, ok := pko.Get(pk) + assert.Nil(t, ok) + assert.Contains(t, segmentIDs, int64(1)) + } + + pko.Remove(WithSegmentIDs(1)) + + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + segmentIDs, ok := pko.Get(pk) + assert.Nil(t, ok) + assert.NotContains(t, segmentIDs, int64(1)) + } +} diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index d2cfb26bfc..3121d0ca45 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -246,6 +246,47 @@ func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockS return _c } +// GetHashFuncNum provides a mock function with given fields: +func (_m *MockSegment) GetHashFuncNum() uint { + ret := _m.Called() + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// MockSegment_GetHashFuncNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHashFuncNum' +type MockSegment_GetHashFuncNum_Call struct { + *mock.Call +} + +// GetHashFuncNum is a helper method to define mock.On call +func (_e *MockSegment_Expecter) GetHashFuncNum() *MockSegment_GetHashFuncNum_Call { + return &MockSegment_GetHashFuncNum_Call{Call: _e.mock.On("GetHashFuncNum")} +} + +func (_c *MockSegment_GetHashFuncNum_Call) Run(run func()) *MockSegment_GetHashFuncNum_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_GetHashFuncNum_Call) Return(_a0 uint) *MockSegment_GetHashFuncNum_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_GetHashFuncNum_Call) RunAndReturn(run func() uint) *MockSegment_GetHashFuncNum_Call { + _c.Call.Return(run) + return _c +} + // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) @@ -1412,6 +1453,49 @@ func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosi return _c } +// TestLocations provides a mock function with given fields: pk, loc +func (_m *MockSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { + ret := _m.Called(pk, loc) + + var r0 bool + if rf, ok := ret.Get(0).(func(storage.PrimaryKey, []uint64) bool); ok { + r0 = rf(pk, loc) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSegment_TestLocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TestLocations' +type MockSegment_TestLocations_Call struct { + *mock.Call +} + +// TestLocations is a helper method to define mock.On call +// - pk storage.PrimaryKey +// - loc []uint64 +func (_e *MockSegment_Expecter) TestLocations(pk interface{}, loc interface{}) *MockSegment_TestLocations_Call { + return &MockSegment_TestLocations_Call{Call: _e.mock.On("TestLocations", pk, loc)} +} + +func (_c *MockSegment_TestLocations_Call) Run(run func(pk storage.PrimaryKey, loc []uint64)) *MockSegment_TestLocations_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(storage.PrimaryKey), args[1].([]uint64)) + }) + return _c +} + +func (_c *MockSegment_TestLocations_Call) Return(_a0 bool) *MockSegment_TestLocations_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_TestLocations_Call) RunAndReturn(run func(storage.PrimaryKey, []uint64) bool) *MockSegment_TestLocations_Call { + _c.Call.Return(run) + return _c +} + // Type provides a mock function with given fields: func (_m *MockSegment) Type() commonpb.SegmentState { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 976fe2972d..2acb841c44 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -187,6 +187,14 @@ func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool { return s.bloomFilterSet.MayPkExist(pk) } +func (s *baseSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { + return s.bloomFilterSet.TestLocations(pk, loc) +} + +func (s *baseSegment) GetHashFuncNum() uint { + return s.bloomFilterSet.GetHashFuncNum() +} + // ResourceUsageEstimate returns the estimated resource usage of the segment. func (s *baseSegment) ResourceUsageEstimate() ResourceUsage { if s.segmentType == SegmentTypeGrowing { diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index ee0b1ed112..9ed9d4df90 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -84,6 +84,8 @@ type Segment interface { // Bloom filter related UpdateBloomFilter(pks []storage.PrimaryKey) MayPkExist(pk storage.PrimaryKey) bool + TestLocations(pk storage.PrimaryKey, loc []uint64) bool + GetHashFuncNum() uint // Read operations Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6339a2db0f..a9b077d7e5 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -372,7 +372,7 @@ func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID in size += stat.BF.Cap() bfs.AddHistoricalStats(pkStat) } - log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size)) + log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size), zap.Int("BFNum", len(stats))) return nil } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 5fc39afc73..0d278a9ccd 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -187,6 +187,14 @@ func (suite *SegmentSuite) TestHasRawData() { suite.True(has) } +func (suite *SegmentSuite) TestLocation() { + pk := storage.NewInt64PrimaryKey(100) + locations := storage.Locations(pk, suite.sealed.GetHashFuncNum()) + ret1 := suite.sealed.TestLocations(pk, locations) + ret2 := suite.sealed.MayPkExist(pk) + suite.Equal(ret1, ret2) +} + func (suite *SegmentSuite) TestCASVersion() { segment := suite.sealed diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index 2278ee22b7..742e59d069 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -107,3 +107,35 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool { // no idea, just make it as false positive return true } + +// Locations returns a list of hash locations representing a data item. +func Locations(pk PrimaryKey, k uint) []uint64 { + switch pk.Type() { + case schemapb.DataType_Int64: + buf := make([]byte, 8) + int64Pk := pk.(*Int64PrimaryKey) + common.Endian.PutUint64(buf, uint64(int64Pk.Value)) + return bloom.Locations(buf, k) + case schemapb.DataType_VarChar: + varCharPk := pk.(*VarCharPrimaryKey) + return bloom.Locations([]byte(varCharPk.Value), k) + default: + // TODO:: + } + return nil +} + +func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool { + // empty pkStatics + if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { + return false + } + + // check bf first, TestLocation just do some bitset compute, cost is cheaper + if !st.PkFilter.TestLocations(locs) { + return false + } + + // check pk range first, ugly but key it for now + return st.MinPK.LE(pk) && st.MaxPK.GE(pk) +}