mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: [10kcp] Reduce memory usage of BF in DataNode and QueryNode (#38133)
1. DataNode: Skip generating BF during the insert phase (BF will be regenerated during the sync phase). 2. QueryNode: Skip generating or maintaining BF for growing segments; deletion checks will be handled in the segcore. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/38129 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
0930430a68
commit
338ccc9ff9
@ -420,7 +420,7 @@ queryNode:
|
||||
# Enable memory mapping (mmap) to optimize the handling of growing raw data.
|
||||
# By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
|
||||
# However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments.
|
||||
growingMmapEnabled: false
|
||||
growingMmapEnabled: true
|
||||
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
|
||||
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
|
||||
lazyload:
|
||||
|
||||
@ -165,23 +165,24 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
||||
|
||||
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
|
||||
// In Skip BF mode, datanode no longer maintains bloom filters.
|
||||
// So, here we skip filtering delete entries.
|
||||
// So, here we skip generating BF (growing segment's BF will be regenerated during the sync phase)
|
||||
// and also skip filtering delete entries by bf.
|
||||
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
|
||||
} else {
|
||||
// distribute delete msg
|
||||
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
||||
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
|
||||
}
|
||||
|
||||
// update pk oracle
|
||||
for _, inData := range groups {
|
||||
// segment shall always exists after buffer insert
|
||||
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
|
||||
for _, segment := range segments {
|
||||
for _, fieldData := range inData.pkField {
|
||||
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
|
||||
if err != nil {
|
||||
return err
|
||||
// update pk oracle
|
||||
for _, inData := range groups {
|
||||
// segment shall always exists after buffer insert
|
||||
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
|
||||
for _, segment := range segments {
|
||||
for _, fieldData := range inData.pkField {
|
||||
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,8 +181,6 @@ func (s *L0WriteBufferSuite) TestBufferData() {
|
||||
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
|
||||
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
|
||||
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
|
||||
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
@ -94,6 +94,7 @@ type baseSegment struct {
|
||||
bloomFilterSet *pkoracle.BloomFilterSet
|
||||
loadInfo *atomic.Pointer[querypb.SegmentLoadInfo]
|
||||
isLazyLoad bool
|
||||
skipGrowingBF bool // Skip generating or maintaining BF for growing segments; deletion checks will be handled in segcore.
|
||||
channel metautil.Channel
|
||||
|
||||
resourceUsageCache *atomic.Pointer[ResourceUsage]
|
||||
@ -114,6 +115,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int
|
||||
bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType),
|
||||
channel: channel,
|
||||
isLazyLoad: isLazyLoad(collection, segmentType),
|
||||
skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(),
|
||||
|
||||
resourceUsageCache: atomic.NewPointer[ResourceUsage](nil),
|
||||
needUpdatedVersion: atomic.NewInt64(0),
|
||||
@ -183,6 +185,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo {
|
||||
}
|
||||
|
||||
func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
|
||||
if s.skipGrowingBF {
|
||||
return
|
||||
}
|
||||
s.bloomFilterSet.UpdateBloomFilter(pks)
|
||||
}
|
||||
|
||||
@ -190,10 +195,20 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
|
||||
// false otherwise,
|
||||
// may returns true even the PK doesn't exist actually
|
||||
func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool {
|
||||
if s.skipGrowingBF {
|
||||
return true
|
||||
}
|
||||
return s.bloomFilterSet.MayPkExist(pk)
|
||||
}
|
||||
|
||||
func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
|
||||
if s.skipGrowingBF {
|
||||
allPositive := make([]bool, lc.Size())
|
||||
for i := 0; i < lc.Size(); i++ {
|
||||
allPositive[i] = true
|
||||
}
|
||||
return allPositive
|
||||
}
|
||||
return s.bloomFilterSet.BatchPkExist(lc)
|
||||
}
|
||||
|
||||
|
||||
@ -2427,6 +2427,8 @@ type queryNodeConfig struct {
|
||||
DefaultSegmentFilterRatio ParamItem `refreshable:"false"`
|
||||
UseStreamComputing ParamItem `refreshable:"false"`
|
||||
|
||||
// BF
|
||||
SkipGrowingSegmentBF ParamItem `refreshable:"true"`
|
||||
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
|
||||
|
||||
QueryStreamBatchSize ParamItem `refreshable:"false"`
|
||||
@ -2698,7 +2700,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
|
||||
p.GrowingMmapEnabled = ParamItem{
|
||||
Key: "queryNode.mmap.growingMmapEnabled",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "false",
|
||||
DefaultValue: "true",
|
||||
FallbackKeys: []string{"queryNode.growingMmapEnabled"},
|
||||
Doc: `Enable memory mapping (mmap) to optimize the handling of growing raw data.
|
||||
By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
|
||||
@ -3114,6 +3116,14 @@ user-task-polling:
|
||||
}
|
||||
p.BloomFilterApplyParallelFactor.Init(base.mgr)
|
||||
|
||||
p.SkipGrowingSegmentBF = ParamItem{
|
||||
Key: "queryNode.skipGrowingSegmentBF",
|
||||
Version: "2.5",
|
||||
DefaultValue: "true",
|
||||
Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments",
|
||||
}
|
||||
p.SkipGrowingSegmentBF.Init(base.mgr)
|
||||
|
||||
p.QueryStreamBatchSize = ParamItem{
|
||||
Key: "queryNode.queryStreamBatchSize",
|
||||
Version: "2.4.1",
|
||||
|
||||
@ -451,6 +451,8 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond))
|
||||
|
||||
assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
|
||||
assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool())
|
||||
|
||||
assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue())
|
||||
|
||||
assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user