enhance: Use BatchPkExist to reduce bloom filter func call cost (#33752)

issue: #33610
pr: #33611

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-06-12 17:45:58 +08:00 committed by GitHub
parent eeba85118e
commit 54feef30e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 419 additions and 355 deletions

View File

@ -242,37 +242,48 @@ func (t *levelZeroCompactionTask) splitDelta(
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()
allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) {
return segment.GetSegmentID(), segment
})
// segments shall be safe to read outside
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...))
split := func(pk storage.PrimaryKey) []int64 {
lc := storage.NewLocationsCache(pk)
return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) {
return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(lc)
})
}
targetSeg := lo.Associate(segments, func(info *metacache.SegmentInfo) (int64, *metacache.SegmentInfo) {
return info.SegmentID(), info
})
// spilt all delete data to segments
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
for _, delta := range allDelta {
for i, pk := range delta.Pks {
predicted := split(pk)
for _, gotSeg := range predicted {
writer, ok := targetSegBuffer[gotSeg]
if !ok {
segment := targetSeg[gotSeg]
writer = NewSegmentDeltaWriter(gotSeg, segment.PartitionID(), t.getCollection())
targetSegBuffer[gotSeg] = writer
split := func(pks []storage.PrimaryKey, pkTss []uint64) {
lc := storage.NewBatchLocationsCache(pks)
for _, s := range segments {
segmentID := s.SegmentID()
hits := s.GetBloomFilterSet().BatchPkExist(lc)
for i, hit := range hits {
if hit {
writer, ok := targetSegBuffer[segmentID]
if !ok {
segment := allSeg[segmentID]
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), t.getCollection())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[i], pkTss[i])
}
writer.Write(pk, delta.Tss[i])
}
}
}
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
// spilt all delete data to segments
for _, deleteBuffer := range allDelta {
pks := deleteBuffer.Pks
pkTss := deleteBuffer.Tss
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx])
}
}
return targetSegBuffer
}

View File

@ -55,7 +55,7 @@ func NewBloomFilterSetWithBatchSize(batchSize uint, historyEntries ...*storage.P
}
}
func (bfs *BloomFilterSet) PkExists(lc storage.LocationsCache) bool {
func (bfs *BloomFilterSet) PkExists(lc *storage.LocationsCache) bool {
bfs.mut.RLock()
defer bfs.mut.RUnlock()
if bfs.current != nil && bfs.current.TestLocationCache(lc) {
@ -70,6 +70,21 @@ func (bfs *BloomFilterSet) PkExists(lc storage.LocationsCache) bool {
return false
}
func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
bfs.mut.RLock()
defer bfs.mut.RUnlock()
hits := make([]bool, lc.Size())
if bfs.current != nil {
bfs.current.BatchPkExist(lc, hits)
}
for _, bf := range bfs.history {
bf.BatchPkExist(lc, hits)
}
return hits
}
func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
bfs.mut.Lock()
defer bfs.mut.Unlock()

View File

@ -19,6 +19,7 @@ package metacache
import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -68,6 +69,37 @@ func (s *BloomFilterSetSuite) TestWriteRead() {
for _, id := range ids {
s.True(s.bfs.PkExists(storage.NewLocationsCache(storage.NewInt64PrimaryKey(id))), "pk shall return exist after update")
}
lc := storage.NewBatchLocationsCache(lo.Map(ids, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
hits := s.bfs.BatchPkExist(lc)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
func (s *BloomFilterSetSuite) TestBatchPkExist() {
capacity := 100000
ids := make([]int64, 0)
for id := 0; id < capacity; id++ {
ids = append(ids, int64(id))
}
bfs := NewBloomFilterSetWithBatchSize(uint(capacity))
err := bfs.UpdatePKRange(s.GetFieldData(ids))
s.NoError(err)
batchSize := 1000
for i := 0; i < capacity; i += batchSize {
endIdx := i + batchSize
if endIdx > capacity {
endIdx = capacity
}
lc := storage.NewBatchLocationsCache(lo.Map(ids[i:endIdx], func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
hits := bfs.BatchPkExist(lc)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
}
func (s *BloomFilterSetSuite) TestRoll() {

View File

@ -1,8 +1,6 @@
package writebuffer
import (
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
@ -10,6 +8,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -32,28 +31,45 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
}
func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
// distribute delete msg for previous data
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) storage.LocationsCache { return storage.NewLocationsCache(pk) })
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) {
lc := storage.NewBatchLocationsCache(pks)
for _, segment := range segments {
if segment.CompactTo() != 0 {
continue
}
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, lc := range lcs {
if segment.GetBloomFilterSet().PkExists(lc) {
deletePks = append(deletePks, pks[idx])
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
for i, hit := range hits {
if hit {
deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, pkTss[i])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
}
}
}
// distribute delete msg for previous data
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments)
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {

View File

@ -3,7 +3,6 @@ package writebuffer
import (
"context"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -16,6 +15,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -48,28 +48,43 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
}
func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
for _, delMsg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) storage.LocationsCache { return storage.NewLocationsCache(pk) })
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo, l0SegmentID int64) {
lc := storage.NewBatchLocationsCache(pks)
for _, segment := range segments {
if segment.CompactTo() != 0 {
continue
}
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, lc := range lcs {
if segment.GetBloomFilterSet().PkExists(lc) {
deletePks = append(deletePks, pks[idx])
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
for i, hit := range hits {
if hit {
deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, pkTss[i])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
}
}
for _, delMsg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments, l0SegmentID)
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {

View File

@ -202,18 +202,23 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// segment => delete data
delRecords := make(map[int64]DeleteData)
for _, data := range deleteData {
for i, pk := range data.PrimaryKeys {
segmentIDs, err := sd.pkOracle.Get(pk, pkoracle.WithPartitionID(data.PartitionID))
if err != nil {
log.Warn("failed to get delete candidates for pk", zap.Any("pk", pk.GetValue()))
continue
pks := data.PrimaryKeys
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
for _, segmentID := range segmentIDs {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pk)
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
pk2SegmentIDs := sd.pkOracle.BatchGet(pks[idx:endIdx], pkoracle.WithPartitionID(data.PartitionID))
for i, segmentIDs := range pk2SegmentIDs {
for _, segmentID := range segmentIDs {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[idx+i])
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[idx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
}
}
}
@ -525,10 +530,20 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
segment := segment.(*segments.L0Segment)
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks {
if candidate.MayPkExist(pk) {
pks = append(pks, pk)
tss = append(tss, segmentTss[i])
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for idx := 0; idx < len(segmentPks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(segmentPks) {
endIdx = len(segmentPks)
}
lc := storage.NewBatchLocationsCache(segmentPks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
pks = append(pks, segmentPks[idx+i])
tss = append(tss, segmentTss[idx+i])
}
}
}
}
@ -636,9 +651,20 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
continue
}
for i, pk := range record.DeleteData.Pks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, record.DeleteData.Tss[i])
pks := record.DeleteData.Pks
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
deleteData.Append(pks[idx+i], record.DeleteData.Tss[idx+i])
}
}
}
}
@ -732,10 +758,21 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
continue
}
for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) {
if candidate.MayPkExist(pk) {
result.Pks = append(result.Pks, pk)
result.Tss = append(result.Tss, dmsg.Timestamps[idx])
pks := storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys())
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
result.Pks = append(result.Pks, pks[idx+i])
result.Tss = append(result.Tss, dmsg.Timestamps[idx+i])
}
}
}
}

View File

@ -261,9 +261,12 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
ms.EXPECT().BatchPkExist(mock.Anything).RunAndReturn(func(lc *storage.BatchLocationsCache) []bool {
hits := make([]bool, lc.Size())
for i, pk := range lc.PKs() {
hits[i] = pk.EQ(storage.NewInt64PrimaryKey(10))
}
return hits
})
return ms
})
@ -880,10 +883,6 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
return ms
})
}, nil)

View File

@ -99,10 +99,6 @@ func (s *DelegatorSuite) SetupTest() {
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
return ms
})
}, nil)

View File

@ -17,7 +17,6 @@
package pkoracle
import (
"context"
"sync"
bloom "github.com/bits-and-blooms/bloom/v3"
@ -46,61 +45,35 @@ type BloomFilterSet struct {
}
// MayPkExist returns whether any bloom filters returns positive.
func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool {
func (s *BloomFilterSet) MayPkExist(pk *storage.LocationsCache) bool {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
if s.currentStat != nil && s.currentStat.PkExist(pk) {
if s.currentStat != nil && s.currentStat.TestLocationCache(pk) {
return true
}
// for sealed, if one of the stats shows it exist, then we have to check it
for _, historyStat := range s.historyStats {
if historyStat.PkExist(pk) {
if historyStat.TestLocationCache(pk) {
return true
}
}
return false
}
func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool {
log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60)
func (s *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
hits := make([]bool, lc.Size())
if s.currentStat != nil {
k := s.currentStat.PkFilter.K()
if k > uint(len(locs)) {
log.RatedWarn(30, "locations num is less than hash func num, return false positive result",
zap.Int("locationNum", len(locs)),
zap.Uint("hashFuncNum", k),
zap.Int64("segmentID", s.segmentID))
return true
}
if s.currentStat.TestLocations(pk, locs[:k]) {
return true
}
s.currentStat.BatchPkExist(lc, hits)
}
// for sealed, if one of the stats shows it exist, then we have to check it
for _, historyStat := range s.historyStats {
k := historyStat.PkFilter.K()
if k > uint(len(locs)) {
log.RatedWarn(30, "locations num is less than hash func num, return false positive result",
zap.Int("locationNum", len(locs)),
zap.Uint("hashFuncNum", k),
zap.Int64("segmentID", s.segmentID))
return true
}
if historyStat.TestLocations(pk, locs[:k]) {
return true
}
for _, bf := range s.historyStats {
bf.BatchPkExist(lc, hits)
}
return false
}
func (s *BloomFilterSet) GetHashFuncNum() uint {
return s.kHashFunc
return hits
}
// ID implement candidate.
@ -124,13 +97,11 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
defer s.statsMutex.Unlock()
if s.currentStat == nil {
m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
if k > s.kHashFunc {
s.kHashFunc = k
}
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.New(m, k),
PkFilter: bloom.NewWithEstimates(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
),
}
}

View File

@ -41,10 +41,9 @@ func TestInt64Pk(t *testing.T) {
bfs.UpdateBloomFilter(pks)
for i := 0; i < batchSize; i++ {
locations := storage.Locations(pks[i], bfs.GetHashFuncNum())
ret1 := bfs.TestLocations(pks[i], locations)
ret2 := bfs.MayPkExist(pks[i])
assert.Equal(t, ret1, ret2)
lc := storage.NewLocationsCache(pks[i])
ret1 := bfs.currentStat.TestLocationCache(lc)
assert.True(t, ret1, true)
}
assert.Equal(t, int64(1), bfs.ID())
@ -66,10 +65,9 @@ func TestVarCharPk(t *testing.T) {
bfs.UpdateBloomFilter(pks)
for i := 0; i < batchSize; i++ {
locations := storage.Locations(pks[i], bfs.GetHashFuncNum())
ret1 := bfs.TestLocations(pks[i], locations)
ret2 := bfs.MayPkExist(pks[i])
assert.Equal(t, ret1, ret2)
lc := storage.NewLocationsCache(pks[i])
ret1 := bfs.currentStat.TestLocationCache(lc)
assert.True(t, ret1, true)
}
}
@ -91,29 +89,14 @@ func TestHistoricalStat(t *testing.T) {
bfs.currentStat = nil
for i := 0; i < batchSize; i++ {
locations := storage.Locations(pks[i], bfs.GetHashFuncNum())
ret1 := bfs.TestLocations(pks[i], locations)
ret2 := bfs.MayPkExist(pks[i])
assert.Equal(t, ret1, ret2)
}
}
func TestHashFuncNum(t *testing.T) {
paramtable.Init()
batchSize := 100
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
for i := 0; i < batchSize; i++ {
// pass locations more then hash func num in bf
locations := storage.Locations(pks[i], bfs.GetHashFuncNum()+3)
ret1 := bfs.TestLocations(pks[i], locations)
assert.True(t, ret1)
lc := storage.NewLocationsCache(pks[i])
ret := bfs.MayPkExist(lc)
assert.True(t, ret)
}
lc := storage.NewBatchLocationsCache(pks)
ret := bfs.BatchPkExist(lc)
for i := range ret {
assert.True(t, ret[i])
}
}

View File

@ -26,9 +26,8 @@ import (
// Candidate is the interface for pk oracle candidate.
type Candidate interface {
// MayPkExist checks whether primary key could exists in this candidate.
MayPkExist(pk storage.PrimaryKey) bool
TestLocations(pk storage.PrimaryKey, locs []uint64) bool
GetHashFuncNum() uint
MayPkExist(lc *storage.LocationsCache) bool
BatchPkExist(lc *storage.BatchLocationsCache) []bool
ID() int64
Partition() int64

View File

@ -28,18 +28,17 @@ type candidateKey struct {
}
// MayPkExist checks whether primary key could exists in this candidate.
func (k candidateKey) MayPkExist(pk storage.PrimaryKey) bool {
func (k candidateKey) MayPkExist(pk *storage.LocationsCache) bool {
// always return true to prevent miuse
return true
}
func (k candidateKey) TestLocations(pk storage.PrimaryKey, locs []uint64) bool {
// always return true to prevent miuse
return true
}
func (k candidateKey) GetHashFuncNum() uint {
return 0
func (k candidateKey) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
ret := make([]bool, 0)
for i := 0; i < lc.Size(); i++ {
ret = append(ret, true)
}
return ret
}
// ID implements Candidate.

View File

@ -19,10 +19,8 @@ package pkoracle
import (
"fmt"
"sync"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -30,6 +28,7 @@ import (
type PkOracle interface {
// GetCandidates returns segment candidates of which pk might belongs to.
Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error)
BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64
// RegisterCandidate adds candidate into pkOracle.
Register(candidate Candidate, workerID int64) error
// RemoveCandidate removes candidate
@ -43,30 +42,12 @@ var _ PkOracle = (*pkOracle)(nil)
// pkOracle implementation.
type pkOracle struct {
candidates *typeutil.ConcurrentMap[string, candidateWithWorker]
hashFuncNumMutex sync.RWMutex
maxHashFuncNum uint
}
func (pko *pkOracle) GetMaxHashFuncNum() uint {
pko.hashFuncNumMutex.RLock()
defer pko.hashFuncNumMutex.RUnlock()
return pko.maxHashFuncNum
}
func (pko *pkOracle) TryUpdateHashFuncNum(newValue uint) {
pko.hashFuncNumMutex.Lock()
defer pko.hashFuncNumMutex.Unlock()
if newValue > pko.maxHashFuncNum {
pko.maxHashFuncNum = newValue
}
}
// Get implements PkOracle.
func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) {
var result []int64
var locations []uint64
lc := storage.NewLocationsCache(pk)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters {
if !filter(candidate) {
@ -74,15 +55,7 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i
}
}
if locations == nil {
locations = storage.Locations(pk, pko.GetMaxHashFuncNum())
if len(locations) == 0 {
log.Warn("pkOracle: no location found for pk")
return true
}
}
if candidate.TestLocations(pk, locations) {
if candidate.MayPkExist(lc) {
result = append(result, candidate.ID())
}
return true
@ -91,13 +64,35 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i
return result, nil
}
func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 {
result := make([][]int64, len(pks))
lc := storage.NewBatchLocationsCache(pks)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters {
if !filter(candidate) {
return true
}
}
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
result[i] = append(result[i], candidate.ID())
}
}
return true
})
return result
}
func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string {
return fmt.Sprintf("%s-%d-%d", candidate.Type().String(), workerID, candidate.ID())
}
// Register register candidate
func (pko *pkOracle) Register(candidate Candidate, workerID int64) error {
pko.TryUpdateHashFuncNum(candidate.GetHashFuncNum())
pko.candidates.Insert(pko.candidateKey(candidate, workerID), candidateWithWorker{
Candidate: candidate,
workerID: workerID,
@ -108,7 +103,6 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error {
// Remove removes candidate from pko.
func (pko *pkOracle) Remove(filters ...CandidateFilter) error {
max := uint(0)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters {
if !filter(candidate) {
@ -116,14 +110,9 @@ func (pko *pkOracle) Remove(filters ...CandidateFilter) error {
}
}
pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID))
if candidate.GetHashFuncNum() > max {
max = candidate.GetHashFuncNum()
}
return true
})
pko.TryUpdateHashFuncNum(max)
return nil
}

View File

@ -35,6 +35,50 @@ func (_m *MockSegment) EXPECT() *MockSegment_Expecter {
return &MockSegment_Expecter{mock: &_m.Mock}
}
// BatchPkExist provides a mock function with given fields: lc
func (_m *MockSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
ret := _m.Called(lc)
var r0 []bool
if rf, ok := ret.Get(0).(func(*storage.BatchLocationsCache) []bool); ok {
r0 = rf(lc)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]bool)
}
}
return r0
}
// MockSegment_BatchPkExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist'
type MockSegment_BatchPkExist_Call struct {
*mock.Call
}
// BatchPkExist is a helper method to define mock.On call
// - lc *storage.BatchLocationsCache
func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchPkExist_Call {
return &MockSegment_BatchPkExist_Call{Call: _e.mock.On("BatchPkExist", lc)}
}
func (_c *MockSegment_BatchPkExist_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchPkExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*storage.BatchLocationsCache))
})
return _c
}
func (_c *MockSegment_BatchPkExist_Call) Return(_a0 []bool) *MockSegment_BatchPkExist_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_BatchPkExist_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchPkExist_Call {
_c.Call.Return(run)
return _c
}
// CASVersion provides a mock function with given fields: _a0, _a1
func (_m *MockSegment) CASVersion(_a0 int64, _a1 int64) bool {
ret := _m.Called(_a0, _a1)
@ -246,47 +290,6 @@ func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockS
return _c
}
// GetHashFuncNum provides a mock function with given fields:
func (_m *MockSegment) GetHashFuncNum() uint {
ret := _m.Called()
var r0 uint
if rf, ok := ret.Get(0).(func() uint); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint)
}
return r0
}
// MockSegment_GetHashFuncNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHashFuncNum'
type MockSegment_GetHashFuncNum_Call struct {
*mock.Call
}
// GetHashFuncNum is a helper method to define mock.On call
func (_e *MockSegment_Expecter) GetHashFuncNum() *MockSegment_GetHashFuncNum_Call {
return &MockSegment_GetHashFuncNum_Call{Call: _e.mock.On("GetHashFuncNum")}
}
func (_c *MockSegment_GetHashFuncNum_Call) Run(run func()) *MockSegment_GetHashFuncNum_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegment_GetHashFuncNum_Call) Return(_a0 uint) *MockSegment_GetHashFuncNum_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_GetHashFuncNum_Call) RunAndReturn(run func() uint) *MockSegment_GetHashFuncNum_Call {
_c.Call.Return(run)
return _c
}
// GetIndex provides a mock function with given fields: fieldID
func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo {
ret := _m.Called(fieldID)
@ -752,13 +755,13 @@ func (_c *MockSegment_LoadInfo_Call) RunAndReturn(run func() *querypb.SegmentLoa
return _c
}
// MayPkExist provides a mock function with given fields: pk
func (_m *MockSegment) MayPkExist(pk storage.PrimaryKey) bool {
ret := _m.Called(pk)
// MayPkExist provides a mock function with given fields: lc
func (_m *MockSegment) MayPkExist(lc *storage.LocationsCache) bool {
ret := _m.Called(lc)
var r0 bool
if rf, ok := ret.Get(0).(func(storage.PrimaryKey) bool); ok {
r0 = rf(pk)
if rf, ok := ret.Get(0).(func(*storage.LocationsCache) bool); ok {
r0 = rf(lc)
} else {
r0 = ret.Get(0).(bool)
}
@ -772,14 +775,14 @@ type MockSegment_MayPkExist_Call struct {
}
// MayPkExist is a helper method to define mock.On call
// - pk storage.PrimaryKey
func (_e *MockSegment_Expecter) MayPkExist(pk interface{}) *MockSegment_MayPkExist_Call {
return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", pk)}
// - lc *storage.LocationsCache
func (_e *MockSegment_Expecter) MayPkExist(lc interface{}) *MockSegment_MayPkExist_Call {
return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", lc)}
}
func (_c *MockSegment_MayPkExist_Call) Run(run func(pk storage.PrimaryKey)) *MockSegment_MayPkExist_Call {
func (_c *MockSegment_MayPkExist_Call) Run(run func(lc *storage.LocationsCache)) *MockSegment_MayPkExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(storage.PrimaryKey))
run(args[0].(*storage.LocationsCache))
})
return _c
}
@ -789,7 +792,7 @@ func (_c *MockSegment_MayPkExist_Call) Return(_a0 bool) *MockSegment_MayPkExist_
return _c
}
func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(storage.PrimaryKey) bool) *MockSegment_MayPkExist_Call {
func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(*storage.LocationsCache) bool) *MockSegment_MayPkExist_Call {
_c.Call.Return(run)
return _c
}
@ -1453,49 +1456,6 @@ func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosi
return _c
}
// TestLocations provides a mock function with given fields: pk, loc
func (_m *MockSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool {
ret := _m.Called(pk, loc)
var r0 bool
if rf, ok := ret.Get(0).(func(storage.PrimaryKey, []uint64) bool); ok {
r0 = rf(pk, loc)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSegment_TestLocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TestLocations'
type MockSegment_TestLocations_Call struct {
*mock.Call
}
// TestLocations is a helper method to define mock.On call
// - pk storage.PrimaryKey
// - loc []uint64
func (_e *MockSegment_Expecter) TestLocations(pk interface{}, loc interface{}) *MockSegment_TestLocations_Call {
return &MockSegment_TestLocations_Call{Call: _e.mock.On("TestLocations", pk, loc)}
}
func (_c *MockSegment_TestLocations_Call) Run(run func(pk storage.PrimaryKey, loc []uint64)) *MockSegment_TestLocations_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(storage.PrimaryKey), args[1].([]uint64))
})
return _c
}
func (_c *MockSegment_TestLocations_Call) Return(_a0 bool) *MockSegment_TestLocations_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_TestLocations_Call) RunAndReturn(run func(storage.PrimaryKey, []uint64) bool) *MockSegment_TestLocations_Call {
_c.Call.Return(run)
return _c
}
// Type provides a mock function with given fields:
func (_m *MockSegment) Type() commonpb.SegmentState {
ret := _m.Called()

View File

@ -182,16 +182,12 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
// MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter,
// false otherwise,
// may returns true even the PK doesn't exist actually
func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool {
func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool {
return s.bloomFilterSet.MayPkExist(pk)
}
func (s *baseSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool {
return s.bloomFilterSet.TestLocations(pk, loc)
}
func (s *baseSegment) GetHashFuncNum() uint {
return s.bloomFilterSet.GetHashFuncNum()
func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
return s.bloomFilterSet.BatchPkExist(lc)
}
// ResourceUsageEstimate returns the estimated resource usage of the segment.

View File

@ -83,9 +83,8 @@ type Segment interface {
// Bloom filter related
UpdateBloomFilter(pks []storage.PrimaryKey)
MayPkExist(pk storage.PrimaryKey) bool
TestLocations(pk storage.PrimaryKey, loc []uint64) bool
GetHashFuncNum() uint
MayPkExist(lc *storage.LocationsCache) bool
BatchPkExist(lc *storage.BatchLocationsCache) []bool
// Read operations
Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error)

View File

@ -226,7 +226,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() {
// Won't load bloom filter with sealed segments
for _, segment := range segments {
for pk := 0; pk < 100; pk++ {
exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk)))
exist := segment.MayPkExist(lc)
suite.Require().False(exist)
}
}
@ -260,7 +261,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() {
// Should load bloom filter with growing segments
for _, segment := range segments {
for pk := 0; pk < 100; pk++ {
exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk)))
exist := segment.MayPkExist(lc)
suite.True(exist)
}
}
@ -351,7 +353,8 @@ func (suite *SegmentLoaderSuite) TestLoadBloomFilter() {
for _, bf := range bfs {
for pk := 0; pk < 100; pk++ {
exist := bf.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk)))
exist := bf.MayPkExist(lc)
suite.Require().True(exist)
}
}
@ -404,7 +407,8 @@ func (suite *SegmentLoaderSuite) TestLoadDeltaLogs() {
if pk == 1 || pk == 2 {
continue
}
exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk)))
exist := segment.MayPkExist(lc)
suite.Require().True(exist)
}
}
@ -457,7 +461,8 @@ func (suite *SegmentLoaderSuite) TestLoadDupDeltaLogs() {
if pk == 1 || pk == 2 {
continue
}
exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk)))
exist := segment.MayPkExist(lc)
suite.Require().True(exist)
}

View File

@ -188,14 +188,6 @@ func (suite *SegmentSuite) TestHasRawData() {
suite.True(has)
}
func (suite *SegmentSuite) TestLocation() {
pk := storage.NewInt64PrimaryKey(100)
locations := storage.Locations(pk, suite.sealed.GetHashFuncNum())
ret1 := suite.sealed.TestLocations(pk, locations)
ret2 := suite.sealed.MayPkExist(pk)
suite.Equal(ret1, ret2)
}
func (suite *SegmentSuite) TestCASVersion() {
segment := suite.sealed

View File

@ -21,6 +21,7 @@ import (
"github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
@ -125,22 +126,7 @@ func Locations(pk PrimaryKey, k uint) []uint64 {
return nil
}
func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return false
}
// check bf first, TestLocation just do some bitset compute, cost is cheaper
if !st.PkFilter.TestLocations(locs) {
return false
}
// check pk range first, ugly but key it for now
return st.MinPK.LE(pk) && st.MaxPK.GE(pk)
}
func (st *PkStatistics) TestLocationCache(lc LocationsCache) bool {
func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return false
@ -155,26 +141,78 @@ func (st *PkStatistics) TestLocationCache(lc LocationsCache) bool {
return st.MinPK.LE(lc.pk) && st.MaxPK.GE(lc.pk)
}
func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return hits
}
// check bf first, TestLocation just do some bitset compute, cost is cheaper
locations := lc.Locations(st.PkFilter.K())
pks := lc.PKs()
for i := range pks {
// todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment,
// hits array will be removed after we merge bf in segment
if !hits[i] {
hits[i] = st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
}
}
return hits
}
// LocationsCache is a helper struct caching pk bloom filter locations.
// Note that this helper is not concurrent safe and shall be used in same goroutine.
type LocationsCache struct {
pk PrimaryKey
locations map[uint][]uint64
locations []uint64
}
func (lc LocationsCache) Locations(k uint) []uint64 {
locs, ok := lc.locations[k]
if ok {
return locs
}
locs = Locations(lc.pk, k)
lc.locations[k] = locs
return locs
func (lc *LocationsCache) GetPk() PrimaryKey {
return lc.pk
}
func NewLocationsCache(pk PrimaryKey) LocationsCache {
return LocationsCache{
pk: pk,
locations: make(map[uint][]uint64),
func (lc *LocationsCache) Locations(k uint) []uint64 {
if int(k) > len(lc.locations) {
lc.locations = Locations(lc.pk, k)
}
return lc.locations[:k]
}
func NewLocationsCache(pk PrimaryKey) *LocationsCache {
return &LocationsCache{
pk: pk,
}
}
type BatchLocationsCache struct {
pks []PrimaryKey
k uint
locations [][]uint64
}
func (lc *BatchLocationsCache) PKs() []PrimaryKey {
return lc.pks
}
func (lc *BatchLocationsCache) Size() int {
return len(lc.pks)
}
func (lc *BatchLocationsCache) Locations(k uint) [][]uint64 {
if k > lc.k {
lc.k = k
lc.locations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
return Locations(pk, lc.k)
})
}
return lc.locations
}
func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache {
return &BatchLocationsCache{
pks: pks,
}
}

View File

@ -237,14 +237,15 @@ type commonConfig struct {
LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"`
StoragePathPrefix ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"`
PanicWhenPluginFail ParamItem `refreshable:"false"`
StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"`
StoragePathPrefix ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"`
BloomFilterApplyBatchSize ParamItem `refreshable:"true"`
PanicWhenPluginFail ParamItem `refreshable:"false"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -734,6 +735,15 @@ like the old password verification when updating the credential`,
}
p.MaxBloomFalsePositive.Init(base.mgr)
p.BloomFilterApplyBatchSize = ParamItem{
Key: "common.bloomFilterApplyBatchSize",
Version: "2.4.4",
DefaultValue: "1000",
Doc: "batch size when to apply pk to bloom filter",
Export: true,
}
p.BloomFilterApplyBatchSize.Init(base.mgr)
p.PanicWhenPluginFail = ParamItem{
Key: "common.panicWhenPluginFail",
Version: "2.4.2",

View File

@ -108,6 +108,8 @@ func TestComponentParam(t *testing.T) {
params.Save("common.preCreatedTopic.timeticker", "timeticker")
assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings())
assert.Equal(t, uint(1000), params.CommonCfg.BloomFilterApplyBatchSize.GetAsUint())
})
t.Run("test rootCoordConfig", func(t *testing.T) {