diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 51b2dda5bf..00071964b9 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -242,37 +242,48 @@ func (t *levelZeroCompactionTask) splitDelta( _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") defer span.End() + allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { + return segment.GetSegmentID(), segment + }) + // segments shall be safe to read outside segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...)) - split := func(pk storage.PrimaryKey) []int64 { - lc := storage.NewLocationsCache(pk) - return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) { - return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(lc) - }) - } - - targetSeg := lo.Associate(segments, func(info *metacache.SegmentInfo) (int64, *metacache.SegmentInfo) { - return info.SegmentID(), info - }) // spilt all delete data to segments targetSegBuffer := make(map[int64]*SegmentDeltaWriter) - for _, delta := range allDelta { - for i, pk := range delta.Pks { - predicted := split(pk) - - for _, gotSeg := range predicted { - writer, ok := targetSegBuffer[gotSeg] - if !ok { - segment := targetSeg[gotSeg] - writer = NewSegmentDeltaWriter(gotSeg, segment.PartitionID(), t.getCollection()) - targetSegBuffer[gotSeg] = writer + split := func(pks []storage.PrimaryKey, pkTss []uint64) { + lc := storage.NewBatchLocationsCache(pks) + for _, s := range segments { + segmentID := s.SegmentID() + hits := s.GetBloomFilterSet().BatchPkExist(lc) + for i, hit := range hits { + if hit { + writer, ok := targetSegBuffer[segmentID] + if !ok { + segment := allSeg[segmentID] + writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), t.getCollection()) + targetSegBuffer[segmentID] = writer + } + writer.Write(pks[i], pkTss[i]) } - writer.Write(pk, delta.Tss[i]) } } } + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + // spilt all delete data to segments + for _, deleteBuffer := range allDelta { + pks := deleteBuffer.Pks + pkTss := deleteBuffer.Tss + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx]) + } + } + return targetSegBuffer } diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 002988f61d..15a16b61f8 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -55,7 +55,7 @@ func NewBloomFilterSetWithBatchSize(batchSize uint, historyEntries ...*storage.P } } -func (bfs *BloomFilterSet) PkExists(lc storage.LocationsCache) bool { +func (bfs *BloomFilterSet) PkExists(lc *storage.LocationsCache) bool { bfs.mut.RLock() defer bfs.mut.RUnlock() if bfs.current != nil && bfs.current.TestLocationCache(lc) { @@ -70,6 +70,21 @@ func (bfs *BloomFilterSet) PkExists(lc storage.LocationsCache) bool { return false } +func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + bfs.mut.RLock() + defer bfs.mut.RUnlock() + + hits := make([]bool, lc.Size()) + if bfs.current != nil { + bfs.current.BatchPkExist(lc, hits) + } + + for _, bf := range bfs.history { + bf.BatchPkExist(lc, hits) + } + return hits +} + func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { bfs.mut.Lock() defer bfs.mut.Unlock() diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index 630133075e..2340279224 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -19,6 +19,7 @@ package metacache import ( "testing" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -68,6 +69,37 @@ func (s *BloomFilterSetSuite) TestWriteRead() { for _, id := range ids { s.True(s.bfs.PkExists(storage.NewLocationsCache(storage.NewInt64PrimaryKey(id))), "pk shall return exist after update") } + + lc := storage.NewBatchLocationsCache(lo.Map(ids, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + hits := s.bfs.BatchPkExist(lc) + for _, hit := range hits { + s.True(hit, "pk shall return exist after batch update") + } +} + +func (s *BloomFilterSetSuite) TestBatchPkExist() { + capacity := 100000 + ids := make([]int64, 0) + for id := 0; id < capacity; id++ { + ids = append(ids, int64(id)) + } + + bfs := NewBloomFilterSetWithBatchSize(uint(capacity)) + err := bfs.UpdatePKRange(s.GetFieldData(ids)) + s.NoError(err) + + batchSize := 1000 + for i := 0; i < capacity; i += batchSize { + endIdx := i + batchSize + if endIdx > capacity { + endIdx = capacity + } + lc := storage.NewBatchLocationsCache(lo.Map(ids[i:endIdx], func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + hits := bfs.BatchPkExist(lc) + for _, hit := range hits { + s.True(hit, "pk shall return exist after batch update") + } + } } func (s *BloomFilterSetSuite) TestRoll() { diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 0438396879..0729a7f332 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -1,8 +1,6 @@ package writebuffer import ( - "github.com/samber/lo" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datanode/metacache" @@ -10,6 +8,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -32,28 +31,45 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca } func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { - // distribute delete msg for previous data - for _, delMsg := range deleteMsgs { - pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) - lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) storage.LocationsCache { return storage.NewLocationsCache(pk) }) - segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + + split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) { + lc := storage.NewBatchLocationsCache(pks) for _, segment := range segments { if segment.CompactTo() != 0 { continue } + + hits := segment.GetBloomFilterSet().BatchPkExist(lc) var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp - for idx, lc := range lcs { - if segment.GetBloomFilterSet().PkExists(lc) { - deletePks = append(deletePks, pks[idx]) - deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + for i, hit := range hits { + if hit { + deletePks = append(deletePks, pks[i]) + deleteTss = append(deleteTss, pkTss[i]) } } + if len(deletePks) > 0 { wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) } } + } + + // distribute delete msg for previous data + for _, delMsg := range deleteMsgs { + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + pkTss := delMsg.GetTimestamps() + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx], segments) + } for _, inData := range groups { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index c4ed68fb11..1883a5e085 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -3,7 +3,6 @@ package writebuffer import ( "context" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -16,6 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -48,28 +48,43 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca } func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { - for _, delMsg := range deleteMsgs { - l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) - pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) - lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) storage.LocationsCache { return storage.NewLocationsCache(pk) }) - segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo, l0SegmentID int64) { + lc := storage.NewBatchLocationsCache(pks) for _, segment := range segments { if segment.CompactTo() != 0 { continue } + + hits := segment.GetBloomFilterSet().BatchPkExist(lc) var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp - for idx, lc := range lcs { - if segment.GetBloomFilterSet().PkExists(lc) { - deletePks = append(deletePks, pks[idx]) - deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + for i, hit := range hits { + if hit { + deletePks = append(deletePks, pks[i]) + deleteTss = append(deleteTss, pkTss[i]) } } if len(deletePks) > 0 { wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos) } } + } + + for _, delMsg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + pkTss := delMsg.GetTimestamps() + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx], segments, l0SegmentID) + } for _, inData := range groups { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index fe6fff9b42..71c8740d94 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -202,18 +202,23 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // segment => delete data delRecords := make(map[int64]DeleteData) for _, data := range deleteData { - for i, pk := range data.PrimaryKeys { - segmentIDs, err := sd.pkOracle.Get(pk, pkoracle.WithPartitionID(data.PartitionID)) - if err != nil { - log.Warn("failed to get delete candidates for pk", zap.Any("pk", pk.GetValue())) - continue + pks := data.PrimaryKeys + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) } - for _, segmentID := range segmentIDs { - delRecord := delRecords[segmentID] - delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pk) - delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[i]) - delRecord.RowCount++ - delRecords[segmentID] = delRecord + + pk2SegmentIDs := sd.pkOracle.BatchGet(pks[idx:endIdx], pkoracle.WithPartitionID(data.PartitionID)) + for i, segmentIDs := range pk2SegmentIDs { + for _, segmentID := range segmentIDs { + delRecord := delRecords[segmentID] + delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[idx+i]) + delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[idx+i]) + delRecord.RowCount++ + delRecords[segmentID] = delRecord + } } } } @@ -525,10 +530,20 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac segment := segment.(*segments.L0Segment) if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { segmentPks, segmentTss := segment.DeleteRecords() - for i, pk := range segmentPks { - if candidate.MayPkExist(pk) { - pks = append(pks, pk) - tss = append(tss, segmentTss[i]) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(segmentPks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(segmentPks) { + endIdx = len(segmentPks) + } + + lc := storage.NewBatchLocationsCache(segmentPks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + pks = append(pks, segmentPks[idx+i]) + tss = append(tss, segmentTss[idx+i]) + } } } } @@ -636,9 +651,20 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID { continue } - for i, pk := range record.DeleteData.Pks { - if candidate.MayPkExist(pk) { - deleteData.Append(pk, record.DeleteData.Tss[i]) + pks := record.DeleteData.Pks + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + + lc := storage.NewBatchLocationsCache(pks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + deleteData.Append(pks[idx+i], record.DeleteData.Tss[idx+i]) + } } } } @@ -732,10 +758,21 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position continue } - for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) { - if candidate.MayPkExist(pk) { - result.Pks = append(result.Pks, pk) - result.Tss = append(result.Tss, dmsg.Timestamps[idx]) + pks := storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + + lc := storage.NewBatchLocationsCache(pks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + result.Pks = append(result.Pks, pks[idx+i]) + result.Tss = append(result.Tss, dmsg.Timestamps[idx+i]) + } } } } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 50665425aa..885924231c 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -261,9 +261,12 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) + ms.EXPECT().BatchPkExist(mock.Anything).RunAndReturn(func(lc *storage.BatchLocationsCache) []bool { + hits := make([]bool, lc.Size()) + for i, pk := range lc.PKs() { + hits[i] = pk.EQ(storage.NewInt64PrimaryKey(10)) + } + return hits }) return ms }) @@ -880,10 +883,6 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) - }) return ms }) }, nil) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 4d51b1145d..2dcd9ac5e0 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -99,10 +99,6 @@ func (s *DelegatorSuite) SetupTest() { ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) - }) return ms }) }, nil) diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index 608bb656ef..d94a4008cc 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -17,7 +17,6 @@ package pkoracle import ( - "context" "sync" bloom "github.com/bits-and-blooms/bloom/v3" @@ -46,61 +45,35 @@ type BloomFilterSet struct { } // MayPkExist returns whether any bloom filters returns positive. -func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { +func (s *BloomFilterSet) MayPkExist(pk *storage.LocationsCache) bool { s.statsMutex.RLock() defer s.statsMutex.RUnlock() - if s.currentStat != nil && s.currentStat.PkExist(pk) { + if s.currentStat != nil && s.currentStat.TestLocationCache(pk) { return true } // for sealed, if one of the stats shows it exist, then we have to check it for _, historyStat := range s.historyStats { - if historyStat.PkExist(pk) { + if historyStat.TestLocationCache(pk) { return true } } return false } -func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { - log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60) +func (s *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool { s.statsMutex.RLock() defer s.statsMutex.RUnlock() + hits := make([]bool, lc.Size()) if s.currentStat != nil { - k := s.currentStat.PkFilter.K() - if k > uint(len(locs)) { - log.RatedWarn(30, "locations num is less than hash func num, return false positive result", - zap.Int("locationNum", len(locs)), - zap.Uint("hashFuncNum", k), - zap.Int64("segmentID", s.segmentID)) - return true - } - - if s.currentStat.TestLocations(pk, locs[:k]) { - return true - } + s.currentStat.BatchPkExist(lc, hits) } - // for sealed, if one of the stats shows it exist, then we have to check it - for _, historyStat := range s.historyStats { - k := historyStat.PkFilter.K() - if k > uint(len(locs)) { - log.RatedWarn(30, "locations num is less than hash func num, return false positive result", - zap.Int("locationNum", len(locs)), - zap.Uint("hashFuncNum", k), - zap.Int64("segmentID", s.segmentID)) - return true - } - if historyStat.TestLocations(pk, locs[:k]) { - return true - } + for _, bf := range s.historyStats { + bf.BatchPkExist(lc, hits) } - return false -} - -func (s *BloomFilterSet) GetHashFuncNum() uint { - return s.kHashFunc + return hits } // ID implement candidate. @@ -124,13 +97,11 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { - m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) - if k > s.kHashFunc { - s.kHashFunc = k - } s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.New(m, k), + PkFilter: bloom.NewWithEstimates( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + ), } } diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go index 0384d3faa7..a3fe58ba28 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set_test.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -41,10 +41,9 @@ func TestInt64Pk(t *testing.T) { bfs.UpdateBloomFilter(pks) for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) + lc := storage.NewLocationsCache(pks[i]) + ret1 := bfs.currentStat.TestLocationCache(lc) + assert.True(t, ret1, true) } assert.Equal(t, int64(1), bfs.ID()) @@ -66,10 +65,9 @@ func TestVarCharPk(t *testing.T) { bfs.UpdateBloomFilter(pks) for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) + lc := storage.NewLocationsCache(pks[i]) + ret1 := bfs.currentStat.TestLocationCache(lc) + assert.True(t, ret1, true) } } @@ -91,29 +89,14 @@ func TestHistoricalStat(t *testing.T) { bfs.currentStat = nil for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) - } -} - -func TestHashFuncNum(t *testing.T) { - paramtable.Init() - batchSize := 100 - pks := make([]storage.PrimaryKey, 0) - for i := 0; i < batchSize; i++ { - pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) - pks = append(pks, pk) - } - - bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) - bfs.UpdateBloomFilter(pks) - - for i := 0; i < batchSize; i++ { - // pass locations more then hash func num in bf - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()+3) - ret1 := bfs.TestLocations(pks[i], locations) - assert.True(t, ret1) + lc := storage.NewLocationsCache(pks[i]) + ret := bfs.MayPkExist(lc) + assert.True(t, ret) + } + + lc := storage.NewBatchLocationsCache(pks) + ret := bfs.BatchPkExist(lc) + for i := range ret { + assert.True(t, ret[i]) } } diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index e5f051e5f1..bb2479702b 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -26,9 +26,8 @@ import ( // Candidate is the interface for pk oracle candidate. type Candidate interface { // MayPkExist checks whether primary key could exists in this candidate. - MayPkExist(pk storage.PrimaryKey) bool - TestLocations(pk storage.PrimaryKey, locs []uint64) bool - GetHashFuncNum() uint + MayPkExist(lc *storage.LocationsCache) bool + BatchPkExist(lc *storage.BatchLocationsCache) []bool ID() int64 Partition() int64 diff --git a/internal/querynodev2/pkoracle/key.go b/internal/querynodev2/pkoracle/key.go index 9845b5e065..511d93ff95 100644 --- a/internal/querynodev2/pkoracle/key.go +++ b/internal/querynodev2/pkoracle/key.go @@ -28,18 +28,17 @@ type candidateKey struct { } // MayPkExist checks whether primary key could exists in this candidate. -func (k candidateKey) MayPkExist(pk storage.PrimaryKey) bool { +func (k candidateKey) MayPkExist(pk *storage.LocationsCache) bool { // always return true to prevent miuse return true } -func (k candidateKey) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { - // always return true to prevent miuse - return true -} - -func (k candidateKey) GetHashFuncNum() uint { - return 0 +func (k candidateKey) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + ret := make([]bool, 0) + for i := 0; i < lc.Size(); i++ { + ret = append(ret, true) + } + return ret } // ID implements Candidate. diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index 4d686503ec..c3d9fc1094 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -19,10 +19,8 @@ package pkoracle import ( "fmt" - "sync" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -30,6 +28,7 @@ import ( type PkOracle interface { // GetCandidates returns segment candidates of which pk might belongs to. Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) + BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 // RegisterCandidate adds candidate into pkOracle. Register(candidate Candidate, workerID int64) error // RemoveCandidate removes candidate @@ -43,30 +42,12 @@ var _ PkOracle = (*pkOracle)(nil) // pkOracle implementation. type pkOracle struct { candidates *typeutil.ConcurrentMap[string, candidateWithWorker] - - hashFuncNumMutex sync.RWMutex - maxHashFuncNum uint -} - -func (pko *pkOracle) GetMaxHashFuncNum() uint { - pko.hashFuncNumMutex.RLock() - defer pko.hashFuncNumMutex.RUnlock() - return pko.maxHashFuncNum -} - -func (pko *pkOracle) TryUpdateHashFuncNum(newValue uint) { - pko.hashFuncNumMutex.Lock() - defer pko.hashFuncNumMutex.Unlock() - if newValue > pko.maxHashFuncNum { - pko.maxHashFuncNum = newValue - } } // Get implements PkOracle. func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { var result []int64 - var locations []uint64 - + lc := storage.NewLocationsCache(pk) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -74,15 +55,7 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i } } - if locations == nil { - locations = storage.Locations(pk, pko.GetMaxHashFuncNum()) - if len(locations) == 0 { - log.Warn("pkOracle: no location found for pk") - return true - } - } - - if candidate.TestLocations(pk, locations) { + if candidate.MayPkExist(lc) { result = append(result, candidate.ID()) } return true @@ -91,13 +64,35 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i return result, nil } +func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 { + result := make([][]int64, len(pks)) + + lc := storage.NewBatchLocationsCache(pks) + pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { + for _, filter := range filters { + if !filter(candidate) { + return true + } + } + + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + result[i] = append(result[i], candidate.ID()) + } + } + return true + }) + + return result +} + func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { return fmt.Sprintf("%s-%d-%d", candidate.Type().String(), workerID, candidate.ID()) } // Register register candidate func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { - pko.TryUpdateHashFuncNum(candidate.GetHashFuncNum()) pko.candidates.Insert(pko.candidateKey(candidate, workerID), candidateWithWorker{ Candidate: candidate, workerID: workerID, @@ -108,7 +103,6 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { // Remove removes candidate from pko. func (pko *pkOracle) Remove(filters ...CandidateFilter) error { - max := uint(0) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -116,14 +110,9 @@ func (pko *pkOracle) Remove(filters ...CandidateFilter) error { } } pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)) - if candidate.GetHashFuncNum() > max { - max = candidate.GetHashFuncNum() - } - return true }) - pko.TryUpdateHashFuncNum(max) return nil } diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 3121d0ca45..ff3b08935a 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -35,6 +35,50 @@ func (_m *MockSegment) EXPECT() *MockSegment_Expecter { return &MockSegment_Expecter{mock: &_m.Mock} } +// BatchPkExist provides a mock function with given fields: lc +func (_m *MockSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + ret := _m.Called(lc) + + var r0 []bool + if rf, ok := ret.Get(0).(func(*storage.BatchLocationsCache) []bool); ok { + r0 = rf(lc) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]bool) + } + } + + return r0 +} + +// MockSegment_BatchPkExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist' +type MockSegment_BatchPkExist_Call struct { + *mock.Call +} + +// BatchPkExist is a helper method to define mock.On call +// - lc *storage.BatchLocationsCache +func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchPkExist_Call { + return &MockSegment_BatchPkExist_Call{Call: _e.mock.On("BatchPkExist", lc)} +} + +func (_c *MockSegment_BatchPkExist_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchPkExist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*storage.BatchLocationsCache)) + }) + return _c +} + +func (_c *MockSegment_BatchPkExist_Call) Return(_a0 []bool) *MockSegment_BatchPkExist_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_BatchPkExist_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchPkExist_Call { + _c.Call.Return(run) + return _c +} + // CASVersion provides a mock function with given fields: _a0, _a1 func (_m *MockSegment) CASVersion(_a0 int64, _a1 int64) bool { ret := _m.Called(_a0, _a1) @@ -246,47 +290,6 @@ func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockS return _c } -// GetHashFuncNum provides a mock function with given fields: -func (_m *MockSegment) GetHashFuncNum() uint { - ret := _m.Called() - - var r0 uint - if rf, ok := ret.Get(0).(func() uint); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint) - } - - return r0 -} - -// MockSegment_GetHashFuncNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHashFuncNum' -type MockSegment_GetHashFuncNum_Call struct { - *mock.Call -} - -// GetHashFuncNum is a helper method to define mock.On call -func (_e *MockSegment_Expecter) GetHashFuncNum() *MockSegment_GetHashFuncNum_Call { - return &MockSegment_GetHashFuncNum_Call{Call: _e.mock.On("GetHashFuncNum")} -} - -func (_c *MockSegment_GetHashFuncNum_Call) Run(run func()) *MockSegment_GetHashFuncNum_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockSegment_GetHashFuncNum_Call) Return(_a0 uint) *MockSegment_GetHashFuncNum_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_GetHashFuncNum_Call) RunAndReturn(run func() uint) *MockSegment_GetHashFuncNum_Call { - _c.Call.Return(run) - return _c -} - // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) @@ -752,13 +755,13 @@ func (_c *MockSegment_LoadInfo_Call) RunAndReturn(run func() *querypb.SegmentLoa return _c } -// MayPkExist provides a mock function with given fields: pk -func (_m *MockSegment) MayPkExist(pk storage.PrimaryKey) bool { - ret := _m.Called(pk) +// MayPkExist provides a mock function with given fields: lc +func (_m *MockSegment) MayPkExist(lc *storage.LocationsCache) bool { + ret := _m.Called(lc) var r0 bool - if rf, ok := ret.Get(0).(func(storage.PrimaryKey) bool); ok { - r0 = rf(pk) + if rf, ok := ret.Get(0).(func(*storage.LocationsCache) bool); ok { + r0 = rf(lc) } else { r0 = ret.Get(0).(bool) } @@ -772,14 +775,14 @@ type MockSegment_MayPkExist_Call struct { } // MayPkExist is a helper method to define mock.On call -// - pk storage.PrimaryKey -func (_e *MockSegment_Expecter) MayPkExist(pk interface{}) *MockSegment_MayPkExist_Call { - return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", pk)} +// - lc *storage.LocationsCache +func (_e *MockSegment_Expecter) MayPkExist(lc interface{}) *MockSegment_MayPkExist_Call { + return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", lc)} } -func (_c *MockSegment_MayPkExist_Call) Run(run func(pk storage.PrimaryKey)) *MockSegment_MayPkExist_Call { +func (_c *MockSegment_MayPkExist_Call) Run(run func(lc *storage.LocationsCache)) *MockSegment_MayPkExist_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(storage.PrimaryKey)) + run(args[0].(*storage.LocationsCache)) }) return _c } @@ -789,7 +792,7 @@ func (_c *MockSegment_MayPkExist_Call) Return(_a0 bool) *MockSegment_MayPkExist_ return _c } -func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(storage.PrimaryKey) bool) *MockSegment_MayPkExist_Call { +func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(*storage.LocationsCache) bool) *MockSegment_MayPkExist_Call { _c.Call.Return(run) return _c } @@ -1453,49 +1456,6 @@ func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosi return _c } -// TestLocations provides a mock function with given fields: pk, loc -func (_m *MockSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { - ret := _m.Called(pk, loc) - - var r0 bool - if rf, ok := ret.Get(0).(func(storage.PrimaryKey, []uint64) bool); ok { - r0 = rf(pk, loc) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// MockSegment_TestLocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TestLocations' -type MockSegment_TestLocations_Call struct { - *mock.Call -} - -// TestLocations is a helper method to define mock.On call -// - pk storage.PrimaryKey -// - loc []uint64 -func (_e *MockSegment_Expecter) TestLocations(pk interface{}, loc interface{}) *MockSegment_TestLocations_Call { - return &MockSegment_TestLocations_Call{Call: _e.mock.On("TestLocations", pk, loc)} -} - -func (_c *MockSegment_TestLocations_Call) Run(run func(pk storage.PrimaryKey, loc []uint64)) *MockSegment_TestLocations_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(storage.PrimaryKey), args[1].([]uint64)) - }) - return _c -} - -func (_c *MockSegment_TestLocations_Call) Return(_a0 bool) *MockSegment_TestLocations_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_TestLocations_Call) RunAndReturn(run func(storage.PrimaryKey, []uint64) bool) *MockSegment_TestLocations_Call { - _c.Call.Return(run) - return _c -} - // Type provides a mock function with given fields: func (_m *MockSegment) Type() commonpb.SegmentState { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5de6ae1027..ea864607e0 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -182,16 +182,12 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { // MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter, // false otherwise, // may returns true even the PK doesn't exist actually -func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool { +func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { return s.bloomFilterSet.MayPkExist(pk) } -func (s *baseSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { - return s.bloomFilterSet.TestLocations(pk, loc) -} - -func (s *baseSegment) GetHashFuncNum() uint { - return s.bloomFilterSet.GetHashFuncNum() +func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + return s.bloomFilterSet.BatchPkExist(lc) } // ResourceUsageEstimate returns the estimated resource usage of the segment. diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 9ed9d4df90..164395b206 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -83,9 +83,8 @@ type Segment interface { // Bloom filter related UpdateBloomFilter(pks []storage.PrimaryKey) - MayPkExist(pk storage.PrimaryKey) bool - TestLocations(pk storage.PrimaryKey, loc []uint64) bool - GetHashFuncNum() uint + MayPkExist(lc *storage.LocationsCache) bool + BatchPkExist(lc *storage.BatchLocationsCache) []bool // Read operations Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 138fed79b7..a1930159d4 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -226,7 +226,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() { // Won't load bloom filter with sealed segments for _, segment := range segments { for pk := 0; pk < 100; pk++ { - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().False(exist) } } @@ -260,7 +261,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() { // Should load bloom filter with growing segments for _, segment := range segments { for pk := 0; pk < 100; pk++ { - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.True(exist) } } @@ -351,7 +353,8 @@ func (suite *SegmentLoaderSuite) TestLoadBloomFilter() { for _, bf := range bfs { for pk := 0; pk < 100; pk++ { - exist := bf.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := bf.MayPkExist(lc) suite.Require().True(exist) } } @@ -404,7 +407,8 @@ func (suite *SegmentLoaderSuite) TestLoadDeltaLogs() { if pk == 1 || pk == 2 { continue } - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().True(exist) } } @@ -457,7 +461,8 @@ func (suite *SegmentLoaderSuite) TestLoadDupDeltaLogs() { if pk == 1 || pk == 2 { continue } - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().True(exist) } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index d4f1855ab4..464df07e7a 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -188,14 +188,6 @@ func (suite *SegmentSuite) TestHasRawData() { suite.True(has) } -func (suite *SegmentSuite) TestLocation() { - pk := storage.NewInt64PrimaryKey(100) - locations := storage.Locations(pk, suite.sealed.GetHashFuncNum()) - ret1 := suite.sealed.TestLocations(pk, locations) - ret2 := suite.sealed.MayPkExist(pk) - suite.Equal(ret1, ret2) -} - func (suite *SegmentSuite) TestCASVersion() { segment := suite.sealed diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index c42a8c8792..f1e15f0d3d 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -21,6 +21,7 @@ import ( "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" @@ -125,22 +126,7 @@ func Locations(pk PrimaryKey, k uint) []uint64 { return nil } -func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool { - // empty pkStatics - if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { - return false - } - - // check bf first, TestLocation just do some bitset compute, cost is cheaper - if !st.PkFilter.TestLocations(locs) { - return false - } - - // check pk range first, ugly but key it for now - return st.MinPK.LE(pk) && st.MaxPK.GE(pk) -} - -func (st *PkStatistics) TestLocationCache(lc LocationsCache) bool { +func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { // empty pkStatics if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { return false @@ -155,26 +141,78 @@ func (st *PkStatistics) TestLocationCache(lc LocationsCache) bool { return st.MinPK.LE(lc.pk) && st.MaxPK.GE(lc.pk) } +func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []bool { + // empty pkStatics + if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { + return hits + } + + // check bf first, TestLocation just do some bitset compute, cost is cheaper + locations := lc.Locations(st.PkFilter.K()) + pks := lc.PKs() + for i := range pks { + // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, + // hits array will be removed after we merge bf in segment + if !hits[i] { + hits[i] = st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) + } + } + + return hits +} + // LocationsCache is a helper struct caching pk bloom filter locations. // Note that this helper is not concurrent safe and shall be used in same goroutine. type LocationsCache struct { pk PrimaryKey - locations map[uint][]uint64 + locations []uint64 } -func (lc LocationsCache) Locations(k uint) []uint64 { - locs, ok := lc.locations[k] - if ok { - return locs - } - locs = Locations(lc.pk, k) - lc.locations[k] = locs - return locs +func (lc *LocationsCache) GetPk() PrimaryKey { + return lc.pk } -func NewLocationsCache(pk PrimaryKey) LocationsCache { - return LocationsCache{ - pk: pk, - locations: make(map[uint][]uint64), +func (lc *LocationsCache) Locations(k uint) []uint64 { + if int(k) > len(lc.locations) { + lc.locations = Locations(lc.pk, k) + } + return lc.locations[:k] +} + +func NewLocationsCache(pk PrimaryKey) *LocationsCache { + return &LocationsCache{ + pk: pk, + } +} + +type BatchLocationsCache struct { + pks []PrimaryKey + k uint + + locations [][]uint64 +} + +func (lc *BatchLocationsCache) PKs() []PrimaryKey { + return lc.pks +} + +func (lc *BatchLocationsCache) Size() int { + return len(lc.pks) +} + +func (lc *BatchLocationsCache) Locations(k uint) [][]uint64 { + if k > lc.k { + lc.k = k + lc.locations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k) + }) + } + + return lc.locations +} + +func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache { + return &BatchLocationsCache{ + pks: pks, } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index e82d943c17..3a716af0a9 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -237,14 +237,15 @@ type commonConfig struct { LockSlowLogInfoThreshold ParamItem `refreshable:"true"` LockSlowLogWarnThreshold ParamItem `refreshable:"true"` - StorageScheme ParamItem `refreshable:"false"` - EnableStorageV2 ParamItem `refreshable:"false"` - StoragePathPrefix ParamItem `refreshable:"false"` - TTMsgEnabled ParamItem `refreshable:"true"` - TraceLogMode ParamItem `refreshable:"true"` - BloomFilterSize ParamItem `refreshable:"true"` - MaxBloomFalsePositive ParamItem `refreshable:"true"` - PanicWhenPluginFail ParamItem `refreshable:"false"` + StorageScheme ParamItem `refreshable:"false"` + EnableStorageV2 ParamItem `refreshable:"false"` + StoragePathPrefix ParamItem `refreshable:"false"` + TTMsgEnabled ParamItem `refreshable:"true"` + TraceLogMode ParamItem `refreshable:"true"` + BloomFilterSize ParamItem `refreshable:"true"` + MaxBloomFalsePositive ParamItem `refreshable:"true"` + BloomFilterApplyBatchSize ParamItem `refreshable:"true"` + PanicWhenPluginFail ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -734,6 +735,15 @@ like the old password verification when updating the credential`, } p.MaxBloomFalsePositive.Init(base.mgr) + p.BloomFilterApplyBatchSize = ParamItem{ + Key: "common.bloomFilterApplyBatchSize", + Version: "2.4.4", + DefaultValue: "1000", + Doc: "batch size when to apply pk to bloom filter", + Export: true, + } + p.BloomFilterApplyBatchSize.Init(base.mgr) + p.PanicWhenPluginFail = ParamItem{ Key: "common.panicWhenPluginFail", Version: "2.4.2", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 0a316064a4..d92f6c3a4a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -108,6 +108,8 @@ func TestComponentParam(t *testing.T) { params.Save("common.preCreatedTopic.timeticker", "timeticker") assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings()) + + assert.Equal(t, uint(1000), params.CommonCfg.BloomFilterApplyBatchSize.GetAsUint()) }) t.Run("test rootCoordConfig", func(t *testing.T) {