aoiasd 88e761adc9
enhance: save BM25 stats to the disk while loading the segment (#46468)
Add a thread pool to load BM25 stats and save them to local disk during
loading, reducing peak memory usage.
relate: https://github.com/milvus-io/milvus/issues/41424

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: sealed-segment BM25 statistics are immutable and may
be persisted to disk independently from growing-segment stats; IDF state
is reconstructed by combining persisted sealed stats (preloaded from
disk) with in-memory growing stats at runtime (preloadSealed +
RegisterGrowing).
- Capability added: asynchronous BM25 loading via a dedicated
BM25LoadPool (config key common.threadCoreCoefficient.bm25Load and param
BM25LoadThreadCoreCoefficient) — delegator.loadBM25Stats is executed on
the pool to load sealed BM25 stats and call idfOracle.RegisterSealed,
reducing peak memory during segment load.
- Logic removed/simplified and why: the previous single Register(segID,
stats, state) + background local-writer/spool loop was split into
RegisterGrowing (in-memory) and RegisterSealed (sealed + on-disk
persistence) and the localloop removed; RegisterSealed writes sealed
stats directly (ToLocal) and uses singleflight to deduplicate,
eliminating redundant spooling and lifecycle complexity while clarifying
sealed vs growing flows.
- Why this does NOT introduce data loss or behavior regression: sealed
stats are still written and reloaded (RegisterSealed persists to
dirPath; preloadSealed merges disk and memory on first load), growing
segments continue to register in-memory via RegisterGrowing,
loadSegments now defers sealed BM25 handling to loadBM25Stats but still
registers sealed candidates after load, and tests were updated to
reflect RegisterSealed/RegisterGrowing usage—so
serialization/deserialization, preload semantics, and test coverage
preserve existing persisted state and runtime behavior.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
2026-01-09 19:29:26 +08:00

590 lines
14 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"bufio"
"context"
"fmt"
"io/fs"
"os"
"path"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/pathutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type IDFOracle interface {
SetNext(snapshot *snapshot)
TargetVersion() int64
UpdateGrowing(segmentID int64, stats bm25Stats)
// mark growing segment remove target version
LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64)
RegisterGrowing(segmentID int64, stats bm25Stats)
RegisterSealed(segmentID int64, stats bm25Stats) error
BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error)
Start()
Close()
}
type bm25Stats map[int64]*storage.BM25Stats
func (s bm25Stats) Merge(stats bm25Stats) {
for fieldID, newstats := range stats {
if stats, ok := s[fieldID]; ok {
stats.Merge(newstats)
} else {
s[fieldID] = storage.NewBM25Stats()
s[fieldID].Merge(newstats)
}
}
}
func (s bm25Stats) Minus(stats bm25Stats) {
for fieldID, newstats := range stats {
if stats, ok := s[fieldID]; ok {
stats.Minus(newstats)
} else {
s[fieldID] = storage.NewBM25Stats()
s[fieldID].Minus(newstats)
}
}
}
func (s bm25Stats) GetStats(fieldID int64) (*storage.BM25Stats, error) {
stats, ok := s[fieldID]
if !ok {
return nil, errors.New("field not found in idf oracle BM25 stats")
}
return stats, nil
}
func (s bm25Stats) NumRow() int64 {
for _, stats := range s {
return stats.NumRow()
}
return 0
}
type sealedBm25Stats struct {
sync.RWMutex // Protect all data in struct except activate
bm25Stats
activate *atomic.Bool
inmemory bool
removed bool
segmentID int64
ts time.Time // Time of segemnt register, all segment resgister after target generate will don't remove
localDir string
fieldList []int64 // bm25 field list
}
func (s *sealedBm25Stats) writeFile(localDir string) (error, bool) {
s.RLock()
if s.removed || !s.inmemory {
return nil, true
}
stats := s.bm25Stats
s.RUnlock()
err := os.MkdirAll(localDir, fs.ModePerm)
if err != nil {
return err, false
}
// RUnlock when stats serialize and write to file
// to avoid block remove stats too long when sync distribution
for fieldID, stats := range stats {
file, err := os.Create(path.Join(localDir, fmt.Sprintf("%d.data", fieldID)))
if err != nil {
return err, false
}
defer file.Close()
writer := bufio.NewWriter(file)
err = stats.SerializeToWriter(writer)
if err != nil {
return err, false
}
err = writer.Flush()
if err != nil {
return err, false
}
}
return nil, false
}
// After merged the stats of a segment into the overall stats, Delegator still need to store the segment stats,
// so that later when the segment is removed from target, we can Minus its stats. To reduce memory usage,
// idfOracle store such per segment stats to disk, and load them when removing the segment.
func (s *sealedBm25Stats) ToLocal(dirPath string) error {
dir := path.Join(dirPath, fmt.Sprint(s.segmentID))
if err, skip := s.writeFile(dir); err != nil {
os.RemoveAll(dir)
return err
} else if skip {
return nil
}
s.Lock()
defer s.Unlock()
s.fieldList = lo.Keys(s.bm25Stats)
s.inmemory = false
s.bm25Stats = nil
s.localDir = dir
if s.removed {
err := os.RemoveAll(s.localDir)
if err != nil {
log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localDir))
}
}
return nil
}
func (s *sealedBm25Stats) Remove() {
s.Lock()
defer s.Unlock()
s.removed = true
if !s.inmemory {
err := os.RemoveAll(s.localDir)
if err != nil {
log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localDir))
}
}
}
// Fetch sealed bm25 stats
// load local file and return it when stats not in memeory
func (s *sealedBm25Stats) FetchStats() (map[int64]*storage.BM25Stats, error) {
s.RLock()
defer s.RUnlock()
if s.inmemory {
return s.bm25Stats, nil
}
stats := make(map[int64]*storage.BM25Stats)
for _, fieldID := range s.fieldList {
path := path.Join(s.localDir, fmt.Sprintf("%d.data", fieldID))
b, err := os.ReadFile(path)
if err != nil {
return nil, errors.Newf("read local file %s: failed: %v", path, err)
}
stats[fieldID] = storage.NewBM25Stats()
err = stats[fieldID].Deserialize(b)
if err != nil {
return nil, errors.Newf("deserialize local file : %s failed: %v", path, err)
}
}
return stats, nil
}
type growingBm25Stats struct {
bm25Stats
activate bool
droppedVersion int64
}
func newBm25Stats(functions []*schemapb.FunctionSchema) bm25Stats {
stats := make(map[int64]*storage.BM25Stats)
for _, function := range functions {
if function.GetType() == schemapb.FunctionType_BM25 {
stats[function.GetOutputFieldIds()[0]] = storage.NewBM25Stats()
}
}
return stats
}
type idfTarget struct {
sync.RWMutex
snapshot *snapshot
ts time.Time // time of target generate
}
func (t *idfTarget) SetSnapshot(snapshot *snapshot) {
t.Lock()
defer t.Unlock()
t.snapshot = snapshot
t.ts = time.Now()
}
func (t *idfTarget) GetSnapshot() (*snapshot, time.Time) {
t.RLock()
defer t.RUnlock()
return t.snapshot, t.ts
}
type idfOracle struct {
sync.RWMutex // protect current and growing segment stats
current bm25Stats
growing map[int64]*growingBm25Stats
sealed typeutil.ConcurrentMap[int64, *sealedBm25Stats]
channel string
// for sync distribution
next idfTarget
targetVersion *atomic.Int64
syncNotify chan struct{}
// for disk cache
localNotify chan struct{}
dirPath string
closeCh chan struct{}
sf conc.Singleflight[any]
wg sync.WaitGroup
toDisk bool
}
// now only used for test
func (o *idfOracle) TargetVersion() int64 {
return o.targetVersion.Load()
}
func (o *idfOracle) preloadSealed(segmentID int64, stats *sealedBm25Stats, memoryStats bm25Stats) {
o.Lock()
defer o.Unlock()
// skip preload if first target was loaded.
if o.targetVersion.Load() != 0 {
return
}
o.sealed.Insert(segmentID, stats)
o.current.Merge(memoryStats)
stats.activate.Store(true)
}
func (o *idfOracle) RegisterGrowing(segmentID int64, stats bm25Stats) {
o.Lock()
defer o.Unlock()
if _, ok := o.growing[segmentID]; ok {
return
}
o.growing[segmentID] = &growingBm25Stats{
bm25Stats: stats,
activate: true,
}
o.current.Merge(stats)
}
func (o *idfOracle) RegisterSealed(segmentID int64, stats bm25Stats) error {
// singleflight to avoid duplicate register sealed segment
_, err, _ := o.sf.Do(fmt.Sprintf("register_sealed_%d", segmentID), func() (any, error) {
if ok := o.sealed.Contain(segmentID); ok {
return nil, nil
}
segStats := &sealedBm25Stats{
bm25Stats: stats,
ts: time.Now(),
activate: atomic.NewBool(false),
inmemory: true,
segmentID: segmentID,
}
// make sure sealed segment stats is on disk after register
if o.toDisk {
err := segStats.ToLocal(o.dirPath)
if err != nil {
log.Warn("idf oracle to local failed, remain in memory", zap.Error(err))
return nil, err
}
}
// preload sealed segment to channel before first target
if o.targetVersion.Load() == 0 {
// segStats ToLocal finished but stats still in memory in this function
// so we could preload with memory stats
o.preloadSealed(segmentID, segStats, stats)
} else {
o.sealed.Insert(segmentID, segStats)
}
return nil, nil
})
if err != nil {
return err
}
return nil
}
func (o *idfOracle) UpdateGrowing(segmentID int64, stats bm25Stats) {
if len(stats) == 0 {
return
}
o.Lock()
defer o.Unlock()
old, ok := o.growing[segmentID]
if !ok {
return
}
old.Merge(stats)
if old.activate {
o.current.Merge(stats)
}
}
func (o *idfOracle) LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64) {
o.Lock()
defer o.Unlock()
for _, segmentID := range segmentIDs {
if stats, ok := o.growing[segmentID]; ok && stats.droppedVersion == 0 {
stats.droppedVersion = targetVersion
}
}
}
func (o *idfOracle) Start() {
o.wg.Add(1)
go o.syncloop()
}
func (o *idfOracle) Close() {
close(o.closeCh)
o.wg.Wait()
os.RemoveAll(o.dirPath)
}
func (o *idfOracle) SetNext(snapshot *snapshot) {
o.next.SetSnapshot(snapshot)
// sync SyncDistibution when first load target
if o.targetVersion.Load() == 0 {
o.SyncDistribution()
} else {
o.NotifySync()
}
}
func (o *idfOracle) NotifySync() {
select {
case o.syncNotify <- struct{}{}:
default:
}
}
func (o *idfOracle) NotifyLocal() {
select {
case o.localNotify <- struct{}{}:
default:
}
}
func (o *idfOracle) syncloop() {
defer o.wg.Done()
for {
select {
case <-o.syncNotify:
err := o.SyncDistribution()
if err != nil {
log.Warn("idf oracle sync distribution failed", zap.Error(err))
time.Sleep(time.Second * 10)
o.NotifySync()
}
case <-o.closeCh:
return
}
}
}
// WARN: SyncDistribution not concurrent safe.
// SyncDistribution sync current target to idf oracle.
func (o *idfOracle) SyncDistribution() error {
snapshot, snapshotTs := o.next.GetSnapshot()
if snapshot.targetVersion <= o.targetVersion.Load() {
return nil
}
sealed, _ := snapshot.Peek()
// intarget segment map
targetMap := typeutil.NewSet[UniqueID]()
// segment with unreadable target version was not been used,
// not remove them till it update version or remove from snapshot(released)
reserveMap := typeutil.NewSet[UniqueID]()
for _, item := range sealed {
for _, segment := range item.Segments {
if segment.Level == datapb.SegmentLevel_L0 {
continue
}
switch segment.TargetVersion {
case snapshot.targetVersion:
targetMap.Insert(segment.SegmentID)
if !o.sealed.Contain(segment.SegmentID) {
log.Warn("idf oracle lack some sealed segment", zap.Int64("segment", segment.SegmentID))
}
case unreadableTargetVersion:
reserveMap.Insert(segment.SegmentID)
}
}
}
diff := bm25Stats{}
var rangeErr error
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
intarget := targetMap.Contain(segmentID)
activate := stats.activate.Load()
// activate segment if segment in target
if intarget && !activate {
stats, err := stats.FetchStats()
if err != nil {
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
return false
}
diff.Merge(stats)
} else
// deactivate segment if segment not in target.
if !intarget && activate {
stats, err := stats.FetchStats()
if err != nil {
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
return false
}
diff.Minus(stats)
}
return true
})
if rangeErr != nil {
return rangeErr
}
o.Lock()
defer o.Unlock()
for segmentID, stats := range o.growing {
// drop growing segment bm25 stats
if stats.droppedVersion != 0 && stats.droppedVersion <= snapshot.targetVersion {
if stats.activate {
o.current.Minus(stats.bm25Stats)
}
delete(o.growing, segmentID)
}
}
o.current.Merge(diff)
// remove sealed segment not in target
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
reserve := reserveMap.Contain(segmentID)
intarget := targetMap.Contain(segmentID)
activate := stats.activate.Load()
// save activate if segment in target.
if intarget && !activate {
stats.activate.Store(true)
}
// deactivate if segment not in target.
if !intarget && activate {
stats.activate.Store(false)
}
// remove
// if segment not in target and not in reserve list
// (means segment target version was old version or segment not in snapshot)
// and add before snapshot Ts
// (forbid remove some new segment register after current snapshot)
if !intarget && !reserve && stats.ts.Before(snapshotTs) {
stats.Remove()
o.sealed.Remove(segmentID)
}
return true
})
o.targetVersion.Store(snapshot.targetVersion)
o.NotifyLocal()
log.Ctx(context.TODO()).Info("sync idf distribution finished", zap.Int64("version", snapshot.targetVersion), zap.Int64("numrow", o.current.NumRow()), zap.Int("growing", len(o.growing)), zap.Int("sealed", o.sealed.Len()))
return nil
}
func (o *idfOracle) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error) {
o.RLock()
defer o.RUnlock()
stats, err := o.current.GetStats(fieldID)
if err != nil {
return nil, 0, err
}
idfBytes := make([][]byte, len(tfs.GetContents()))
for i, tf := range tfs.GetContents() {
idf := stats.BuildIDF(tf)
idfBytes[i] = idf
}
return idfBytes, stats.GetAvgdl(), nil
}
func NewIDFOracle(channel string, functions []*schemapb.FunctionSchema) IDFOracle {
return &idfOracle{
channel: channel,
targetVersion: atomic.NewInt64(0),
current: newBm25Stats(functions),
growing: make(map[int64]*growingBm25Stats),
sealed: typeutil.ConcurrentMap[int64, *sealedBm25Stats]{},
toDisk: paramtable.Get().QueryNodeCfg.IDFEnableDisk.GetAsBool(),
dirPath: path.Join(pathutil.GetPath(pathutil.BM25Path, paramtable.GetNodeID()), channel),
syncNotify: make(chan struct{}, 1),
closeCh: make(chan struct{}),
localNotify: make(chan struct{}, 1),
sf: conc.Singleflight[any]{},
}
}