diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 1d890c35aa..224384f8b2 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -521,6 +521,10 @@ queryNode: bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM workerPooling: size: 10 # the size for worker querynode client pool + idfOracle: + enableDisk: true + localPath: /var/lib/milvus/bm25_logs + writeConcurrency: 4 ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address port: 21123 # TCP port of queryNode grpc: diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 972a473da2..92e853983f 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -946,6 +946,11 @@ func (sd *shardDelegator) Close() { sd.tsCond.Broadcast() sd.lifetime.Wait() + // clean idf oracle + if sd.idfOracle != nil { + sd.idfOracle.Close() + } + // clean up l0 segment in delete buffer start := time.Now() sd.deleteBuffer.Clear() @@ -1066,8 +1071,9 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni } if len(sd.isBM25Field) > 0 { - sd.idfOracle = NewIDFOracle(collection.Schema().GetFunctions()) + sd.idfOracle = NewIDFOracle(sd.collectionID, collection.Schema().GetFunctions()) sd.distribution.SetIDFOracle(sd.idfOracle) + sd.idfOracle.Start() } m := sync.Mutex{} diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 6d875acbeb..1f46764b69 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -933,11 +933,6 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...), pkoracle.WithSegmentType(commonpb.SegmentState_Growing), ) - if sd.idfOracle != nil { - for _, segment := range growing { - sd.idfOracle.RemoveGrowing(segment.SegmentID) - } - } } var releaseErr error diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 9baf57cf44..c9224ae71f 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -945,6 +945,15 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }) } +func (s *DelegatorDataSuite) waitTargetVersion(targetVersion int64) { + for { + if s.delegator.idfOracle.TargetVersion() >= targetVersion { + return + } + time.Sleep(time.Millisecond * 100) + } +} + func (s *DelegatorDataSuite) TestBuildBM25IDF() { s.genCollectionWithFunction() @@ -1004,7 +1013,8 @@ func (s *DelegatorDataSuite) TestBuildBM25IDF() { } snapshot := genSnapShot([]int64{1, 2, 3, 4}, []int64{}, 100) - s.delegator.idfOracle.SyncDistribution(snapshot) + s.delegator.idfOracle.SetNext(snapshot) + s.waitTargetVersion(snapshot.targetVersion) placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(genStringFieldData("test bm25 data")) s.NoError(err) @@ -1160,7 +1170,8 @@ func (s *DelegatorDataSuite) TestBuildBM25IDF() { } snapshot := genSnapShot([]int64{1, 2, 3, 4}, []int64{}, 100) - s.delegator.idfOracle.SyncDistribution(snapshot) + s.delegator.idfOracle.SetNext(snapshot) + s.waitTargetVersion(snapshot.targetVersion) placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(genStringFieldData("test bm25 data")) s.NoError(err) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index dc4fd16b90..5fa8eb7c09 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -350,6 +350,11 @@ func (d *distribution) SyncTargetVersion(newVersion int64, partitions []int64, g // update working partition list d.genSnapshot() + + if d.idfOracle != nil { + d.idfOracle.SetNext(d.current.Load()) + d.idfOracle.LazyRemoveGrowings(newVersion, redundantGrowings...) + } // if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable d.updateServiceable("SyncTargetVersion") @@ -450,9 +455,6 @@ func (d *distribution) genSnapshot() chan struct{} { d.current.Store(newSnapShot) // shall be a new one d.snapshots.GetOrInsert(d.snapshotVersion, newSnapShot) - if d.idfOracle != nil { - d.idfOracle.SyncDistribution(newSnapShot) - } // first snapshot, return closed chan if last == nil { diff --git a/internal/querynodev2/delegator/idf_oracle.go b/internal/querynodev2/delegator/idf_oracle.go index 7a92c5b555..3469a81b70 100644 --- a/internal/querynodev2/delegator/idf_oracle.go +++ b/internal/querynodev2/delegator/idf_oracle.go @@ -18,9 +18,15 @@ package delegator import ( "context" + "encoding/json" + "fmt" + "os" + "path" "sync" + "time" "github.com/cockroachdb/errors" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -29,120 +35,284 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type IDFOracle interface { - // Activate(segmentID int64, state commonpb.SegmentState) error - // Deactivate(segmentID int64, state commonpb.SegmentState) error + SetNext(snapshot *snapshot) + TargetVersion() int64 - SyncDistribution(snapshot *snapshot) + UpdateGrowing(segmentID int64, stats bm25Stats) + // mark growing segment remove target version + LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64) - UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25Stats) - - Register(segmentID int64, stats map[int64]*storage.BM25Stats, state commonpb.SegmentState) - RemoveGrowing(segmentID int64) + Register(segmentID int64, stats bm25Stats, state commonpb.SegmentState) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error) + + Start() + Close() } -type bm25Stats struct { - stats map[int64]*storage.BM25Stats - activate bool - targetVersion int64 - snapVersion int64 -} +type bm25Stats map[int64]*storage.BM25Stats -func (s *bm25Stats) Merge(stats map[int64]*storage.BM25Stats) { +func (s bm25Stats) Merge(stats bm25Stats) { for fieldID, newstats := range stats { - if stats, ok := s.stats[fieldID]; ok { + if stats, ok := s[fieldID]; ok { stats.Merge(newstats) } else { - s.stats[fieldID] = storage.NewBM25Stats() - s.stats[fieldID].Merge(newstats) + s[fieldID] = storage.NewBM25Stats() + s[fieldID].Merge(newstats) } } } -func (s *bm25Stats) Minus(stats map[int64]*storage.BM25Stats) { +func (s bm25Stats) Minus(stats bm25Stats) { for fieldID, newstats := range stats { - if stats, ok := s.stats[fieldID]; ok { + if stats, ok := s[fieldID]; ok { stats.Minus(newstats) } else { - log.Panic("minus failed, BM25 stats not exist", zap.Int64("fieldID", fieldID)) + s[fieldID] = storage.NewBM25Stats() + s[fieldID].Minus(newstats) } } } -func (s *bm25Stats) GetStats(fieldID int64) (*storage.BM25Stats, error) { - stats, ok := s.stats[fieldID] +func (s bm25Stats) GetStats(fieldID int64) (*storage.BM25Stats, error) { + stats, ok := s[fieldID] if !ok { return nil, errors.New("field not found in idf oracle BM25 stats") } return stats, nil } -func (s *bm25Stats) NumRow() int64 { - for _, stats := range s.stats { +func (s bm25Stats) NumRow() int64 { + for _, stats := range s { return stats.NumRow() } return 0 } -func newBm25Stats(functions []*schemapb.FunctionSchema) *bm25Stats { - stats := &bm25Stats{ - stats: make(map[int64]*storage.BM25Stats), +type sealedBm25Stats struct { + sync.RWMutex // Protect all data in struct except activate + bm25Stats + + activate *atomic.Bool + + inmemory bool + removed bool + segmentID int64 + ts time.Time // Time of segemnt register, all segment resgister after target generate will don't remove + localPath string +} + +func (s *sealedBm25Stats) writeFile(localPath string) (error, bool) { + s.RLock() + + if s.removed { + return nil, true } + m := make(map[int64][]byte, len(s.bm25Stats)) + stats := s.bm25Stats + s.RUnlock() + + // RUnlock when stats serialize and write to file + // to avoid block remove stats too long when sync distribution + for fieldID, stats := range stats { + bytes, err := stats.Serialize() + if err != nil { + return err, false + } + + m[fieldID] = bytes + } + + b, err := json.Marshal(m) + if err != nil { + return err, false + } + + err = os.WriteFile(localPath, b, 0o600) + if err != nil { + return err, false + } + return nil, false +} + +func (s *sealedBm25Stats) ShouldOffLoadToDisk() bool { + s.RLock() + defer s.RUnlock() + return s.inmemory && s.activate.Load() && !s.removed +} + +// After merged the stats of a segment into the overall stats, Delegator still need to store the segment stats, +// so that later when the segment is removed from target, we can Minus its stats. To reduce memory usage, +// idfOracle store such per segment stats to disk, and load them when removing the segment. +func (s *sealedBm25Stats) ToLocal(dirPath string) error { + localpath := path.Join(dirPath, fmt.Sprintf("%d.data", s.segmentID)) + + if err, skip := s.writeFile(localpath); err != nil || skip { + return err + } + + s.Lock() + defer s.Unlock() + s.inmemory = false + s.bm25Stats = nil + s.localPath = localpath + + if s.removed { + err := os.Remove(s.localPath) + if err != nil { + log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localPath)) + } + } + return nil +} + +func (s *sealedBm25Stats) Remove() { + s.Lock() + defer s.Unlock() + s.removed = true + + if !s.inmemory { + err := os.Remove(s.localPath) + if err != nil { + log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localPath)) + } + } +} + +// Fetch sealed bm25 stats +// load local file and return it when stats not in memeory +func (s *sealedBm25Stats) FetchStats() (map[int64]*storage.BM25Stats, error) { + s.RLock() + defer s.RUnlock() + + if s.inmemory { + return s.bm25Stats, nil + } + + b, err := os.ReadFile(s.localPath) + if err != nil { + return nil, err + } + + m := make(map[int64][]byte) + err = json.Unmarshal(b, &m) + if err != nil { + return nil, err + } + + stats := make(map[int64]*storage.BM25Stats) + for fieldID, bytes := range m { + stats[fieldID] = storage.NewBM25Stats() + err = stats[fieldID].Deserialize(bytes) + if err != nil { + return nil, err + } + } + return stats, nil +} + +type growingBm25Stats struct { + bm25Stats + + activate bool + droppedVersion int64 +} + +func newBm25Stats(functions []*schemapb.FunctionSchema) bm25Stats { + stats := make(map[int64]*storage.BM25Stats) + for _, function := range functions { if function.GetType() == schemapb.FunctionType_BM25 { - stats.stats[function.GetOutputFieldIds()[0]] = storage.NewBM25Stats() + stats[function.GetOutputFieldIds()[0]] = storage.NewBM25Stats() } } return stats } -type idfOracle struct { +type idfTarget struct { sync.RWMutex - - current *bm25Stats - - growing map[int64]*bm25Stats - sealed map[int64]*bm25Stats - - targetVersion int64 + snapshot *snapshot + ts time.Time // time of target generate } -func (o *idfOracle) Register(segmentID int64, stats map[int64]*storage.BM25Stats, state commonpb.SegmentState) { - o.Lock() - defer o.Unlock() +func (t *idfTarget) SetSnapshot(snapshot *snapshot) { + t.Lock() + defer t.Unlock() + t.snapshot = snapshot + t.ts = time.Now() +} +func (t *idfTarget) GetSnapshot() (*snapshot, time.Time) { + t.RLock() + defer t.RUnlock() + return t.snapshot, t.ts +} + +type idfOracle struct { + sync.RWMutex // protect current and growing segment stats + current bm25Stats + growing map[int64]*growingBm25Stats + + sealed typeutil.ConcurrentMap[int64, *sealedBm25Stats] + + collectionID int64 + + // for sync distribution + next idfTarget + targetVersion *atomic.Int64 + syncNotify chan struct{} + + // for disk cache + localNotify chan struct{} + dirPath string + + closeCh chan struct{} + wg sync.WaitGroup +} + +func (o *idfOracle) TargetVersion() int64 { + return o.targetVersion.Load() +} + +func (o *idfOracle) Register(segmentID int64, stats bm25Stats, state commonpb.SegmentState) { switch state { case segments.SegmentTypeGrowing: + o.Lock() + defer o.Unlock() + if _, ok := o.growing[segmentID]; ok { return } - o.growing[segmentID] = &bm25Stats{ - stats: stats, - activate: true, - targetVersion: initialTargetVersion, + o.growing[segmentID] = &growingBm25Stats{ + bm25Stats: stats, + activate: true, } o.current.Merge(stats) case segments.SegmentTypeSealed: - if _, ok := o.sealed[segmentID]; ok { + if ok := o.sealed.Contain(segmentID); ok { return } - o.sealed[segmentID] = &bm25Stats{ - stats: stats, - activate: false, - targetVersion: unreadableTargetVersion, - } + o.sealed.Insert(segmentID, &sealedBm25Stats{ + bm25Stats: stats, + ts: time.Now(), + activate: atomic.NewBool(false), + inmemory: true, + segmentID: segmentID, + }) default: log.Warn("register segment with unknown state", zap.String("stats", state.String())) return } } -func (o *idfOracle) UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25Stats) { +func (o *idfOracle) UpdateGrowing(segmentID int64, stats bm25Stats) { if len(stats) == 0 { return } @@ -161,33 +331,129 @@ func (o *idfOracle) UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25 } } -func (o *idfOracle) RemoveGrowing(segmentID int64) { +func (o *idfOracle) LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64) { o.Lock() defer o.Unlock() - if stats, ok := o.growing[segmentID]; ok { - if stats.activate { - o.current.Minus(stats.stats) + for _, segmentID := range segmentIDs { + if stats, ok := o.growing[segmentID]; ok && stats.droppedVersion == 0 { + stats.droppedVersion = targetVersion } - delete(o.growing, segmentID) } } -func (o *idfOracle) activate(stats *bm25Stats) { - stats.activate = true - o.current.Merge(stats.stats) +func (o *idfOracle) Start() { + o.wg.Add(1) + go o.syncloop() + + if paramtable.Get().QueryNodeCfg.IDFEnableDisk.GetAsBool() { + o.wg.Add(1) + go o.localloop() + } } -func (o *idfOracle) deactivate(stats *bm25Stats) { - stats.activate = false - o.current.Minus(stats.stats) +func (o *idfOracle) Close() { + close(o.closeCh) + o.wg.Wait() + + os.RemoveAll(o.dirPath) } -func (o *idfOracle) SyncDistribution(snapshot *snapshot) { - o.Lock() - defer o.Unlock() +func (o *idfOracle) SetNext(snapshot *snapshot) { + o.next.SetSnapshot(snapshot) - sealed, growing := snapshot.Peek() + // sync SyncDistibution when first load target + if o.targetVersion.Load() == 0 { + o.SyncDistribution() + } else { + o.NotifySync() + } +} + +func (o *idfOracle) NotifySync() { + select { + case o.syncNotify <- struct{}{}: + default: + } +} + +func (o *idfOracle) NotifyLocal() { + select { + case o.localNotify <- struct{}{}: + default: + } +} + +func (o *idfOracle) syncloop() { + defer o.wg.Done() + + for { + select { + case <-o.syncNotify: + err := o.SyncDistribution() + if err != nil { + log.Warn("idf oracle sync distribution failed", zap.Error(err)) + time.Sleep(time.Second * 10) + o.NotifySync() + } + case <-o.closeCh: + return + } + } +} + +func (o *idfOracle) localloop() { + pool := conc.NewPool[struct{}](paramtable.Get().QueryNodeCfg.IDFWriteConcurrenct.GetAsInt()) + o.dirPath = path.Join(paramtable.Get().QueryNodeCfg.IDFLocalPath.GetValue(), fmt.Sprintf("%d", o.collectionID)) + + defer o.wg.Done() + for { + select { + case <-o.localNotify: + statsList := []*sealedBm25Stats{} + o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool { + if stats.ShouldOffLoadToDisk() { + statsList = append(statsList, stats) + } + return true + }) + + if _, err := os.Stat(o.dirPath); os.IsNotExist(err) { + err = os.MkdirAll(o.dirPath, 0o755) + if err != nil { + log.Warn("create idf local path failed", zap.Error(err)) + } + } + + features := []*conc.Future[struct{}]{} + for _, stats := range statsList { + features = append(features, pool.Submit(func() (struct{}, error) { + err := stats.ToLocal(o.dirPath) + if err != nil { + log.Warn("idf oracle to local failed", zap.Error(err)) + return struct{}{}, nil + } + return struct{}{}, nil + })) + } + + conc.AwaitAll(features...) + case <-o.closeCh: + return + } + } +} + +// WARN: SyncDistribution not concurrent safe. +func (o *idfOracle) SyncDistribution() error { + snapshot, snapshotTs := o.next.GetSnapshot() + if snapshot.targetVersion <= o.targetVersion.Load() { + return nil + } + + sealed, _ := snapshot.Peek() + + sealedMap := map[int64]bool{} // sealed diff map, activate segment stats if true, and remove if not in map for _, item := range sealed { for _, segment := range item.Segments { @@ -195,47 +461,75 @@ func (o *idfOracle) SyncDistribution(snapshot *snapshot) { continue } - if stats, ok := o.sealed[segment.SegmentID]; ok { - if stats.targetVersion < segment.TargetVersion { - stats.targetVersion = segment.TargetVersion - } - stats.snapVersion = snapshot.version - } else { - log.Warn("idf oracle lack some sealed segment", zap.Int64("segmentID", segment.SegmentID)) + if segment.TargetVersion == snapshot.targetVersion { + sealedMap[segment.SegmentID] = true + } else if segment.TargetVersion == unreadableTargetVersion { + sealedMap[segment.SegmentID] = false } } } - for _, segment := range growing { - if stats, ok := o.growing[segment.SegmentID]; ok { - stats.targetVersion = segment.TargetVersion - } else { - log.Warn("idf oracle lack some growing segment", zap.Int64("segmentID", segment.SegmentID)) + diff := bm25Stats{} + + var rangeErr error + o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool { + activate, ok := sealedMap[segmentID] + statsActivate := stats.activate.Load() + if ok && activate && !statsActivate { + stats, err := stats.FetchStats() + if err != nil { + rangeErr = err + return false + } + diff.Merge(stats) + } else if !ok && statsActivate { + stats, err := stats.FetchStats() + if err != nil { + rangeErr = err + return false + } + diff.Minus(stats) } + return true + }) + + if rangeErr != nil { + return rangeErr } - o.targetVersion = snapshot.targetVersion + o.Lock() + defer o.Unlock() - for segmentID, stats := range o.sealed { - if !stats.activate && stats.targetVersion == o.targetVersion { - o.activate(stats) - } else if (stats.targetVersion < o.targetVersion && stats.targetVersion != unreadableTargetVersion) || stats.snapVersion != snapshot.version { + for segmentID, stats := range o.growing { + // drop growing segment bm25 stats + if stats.droppedVersion != 0 && stats.droppedVersion <= snapshot.targetVersion { if stats.activate { - o.current.Minus(stats.stats) + o.current.Minus(stats.bm25Stats) } - delete(o.sealed, segmentID) + delete(o.growing, segmentID) } } + o.current.Merge(diff) - for _, stats := range o.growing { - if !stats.activate && (stats.targetVersion == o.targetVersion || stats.targetVersion == initialTargetVersion) { - o.activate(stats) - } else if stats.activate && (stats.targetVersion != o.targetVersion && stats.targetVersion != initialTargetVersion) { - o.deactivate(stats) + // remove sealed segment not in target + o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool { + activate, ok := sealedMap[segmentID] + statsActivate := stats.activate.Load() + if !ok && stats.ts.Before(snapshotTs) { + stats.Remove() + o.sealed.Remove(segmentID) } - } - log.Ctx(context.TODO()).Debug("sync distribution finished", zap.Int64("version", o.targetVersion), zap.Int64("numrow", o.current.NumRow())) + if ok && activate && !statsActivate { + stats.activate.Store(true) + } + return true + }) + + o.targetVersion.Store(snapshot.targetVersion) + o.NotifyLocal() + log.Ctx(context.TODO()).Info("sync distribution finished", zap.Int64("version", snapshot.targetVersion), zap.Int64("numrow", o.current.NumRow()), zap.Int("growing", len(o.growing)), zap.Int("sealed", o.sealed.Len())) + return nil } func (o *idfOracle) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error) { @@ -255,10 +549,15 @@ func (o *idfOracle) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][ return idfBytes, stats.GetAvgdl(), nil } -func NewIDFOracle(functions []*schemapb.FunctionSchema) IDFOracle { +func NewIDFOracle(collID int64, functions []*schemapb.FunctionSchema) IDFOracle { return &idfOracle{ - current: newBm25Stats(functions), - growing: make(map[int64]*bm25Stats), - sealed: make(map[int64]*bm25Stats), + collectionID: collID, + targetVersion: atomic.NewInt64(0), + current: newBm25Stats(functions), + growing: make(map[int64]*growingBm25Stats), + sealed: typeutil.ConcurrentMap[int64, *sealedBm25Stats]{}, + syncNotify: make(chan struct{}, 1), + closeCh: make(chan struct{}), + localNotify: make(chan struct{}, 1), } } diff --git a/internal/querynodev2/delegator/idf_oracle_test.go b/internal/querynodev2/delegator/idf_oracle_test.go index 390d3af9a9..c9b93bb52c 100644 --- a/internal/querynodev2/delegator/idf_oracle_test.go +++ b/internal/querynodev2/delegator/idf_oracle_test.go @@ -18,6 +18,7 @@ package delegator import ( "testing" + "time" "github.com/stretchr/testify/suite" @@ -49,13 +50,27 @@ func (suite *IDFOracleSuite) SetupSuite() { } func (suite *IDFOracleSuite) SetupTest() { - suite.idfOracle = NewIDFOracle(suite.collectionSchema.GetFunctions()).(*idfOracle) + suite.idfOracle = NewIDFOracle(suite.collectionID, suite.collectionSchema.GetFunctions()).(*idfOracle) + suite.idfOracle.Start() suite.snapshot = &snapshot{ dist: []SnapshotItem{{1, make([]SegmentEntry, 0)}}, } suite.targetVersion = 0 } +func (suite *IDFOracleSuite) TearDownTest() { + suite.idfOracle.Close() +} + +func (s *IDFOracleSuite) waitTargetVersion(targetVersion int64) { + for { + if s.idfOracle.TargetVersion() >= targetVersion { + return + } + time.Sleep(time.Millisecond * 100) + } +} + func (suite *IDFOracleSuite) genStats(start uint32, end uint32) map[int64]*storage.BM25Stats { result := make(map[int64]*storage.BM25Stats) result[102] = storage.NewBM25Stats() @@ -124,12 +139,14 @@ func (suite *IDFOracleSuite) TestSealed() { // update and sync snapshot make all sealed activate suite.updateSnapshot(sealedSegs, []int64{}, []int64{}) - suite.idfOracle.SyncDistribution(suite.snapshot) + suite.idfOracle.SetNext(suite.snapshot) + suite.waitTargetVersion(suite.targetVersion) suite.Equal(int64(4), suite.idfOracle.current.NumRow()) releasedSeg := []int64{1, 2, 3} suite.updateSnapshot([]int64{}, []int64{}, releasedSeg) - suite.idfOracle.SyncDistribution(suite.snapshot) + suite.idfOracle.SetNext(suite.snapshot) + suite.waitTargetVersion(suite.targetVersion) suite.Equal(int64(1), suite.idfOracle.current.NumRow()) sparse := typeutil.CreateAndSortSparseFloatRow(map[uint32]float32{4: 1}) @@ -156,7 +173,9 @@ func (suite *IDFOracleSuite) TestGrow() { releasedSeg := []int64{1, 2, 3} suite.updateSnapshot([]int64{}, []int64{}, releasedSeg) - suite.idfOracle.SyncDistribution(suite.snapshot) + suite.idfOracle.LazyRemoveGrowings(suite.snapshot.targetVersion, releasedSeg...) + suite.idfOracle.SetNext(suite.snapshot) + suite.waitTargetVersion(suite.targetVersion) suite.Equal(int64(1), suite.idfOracle.current.NumRow()) suite.idfOracle.UpdateGrowing(4, suite.genStats(5, 6)) @@ -170,14 +189,6 @@ func (suite *IDFOracleSuite) TestStats() { OutputFieldIds: []int64{102}, }}) - suite.NotPanics(func() { - stats.Merge(map[int64]*storage.BM25Stats{103: storage.NewBM25Stats()}) - }) - - suite.Panics(func() { - stats.Minus(map[int64]*storage.BM25Stats{104: storage.NewBM25Stats()}) - }) - _, err := stats.GetStats(104) suite.Error(err) @@ -185,6 +196,42 @@ func (suite *IDFOracleSuite) TestStats() { suite.NoError(err) } +func (suite *IDFOracleSuite) TestLocalCache() { + // register sealed + sealedSegs := []int64{1, 2, 3, 4} + for _, segID := range sealedSegs { + suite.idfOracle.Register(segID, suite.genStats(uint32(segID), uint32(segID)+1), commonpb.SegmentState_Sealed) + } + + // update and sync snapshot make all sealed activate + suite.updateSnapshot(sealedSegs, []int64{}, []int64{}) + suite.idfOracle.SetNext(suite.snapshot) + suite.waitTargetVersion(suite.targetVersion) + suite.Equal(int64(4), suite.idfOracle.current.NumRow()) + + suite.Require().Eventually(func() bool { + allInLocal := true + suite.idfOracle.sealed.Range(func(id int64, stats *sealedBm25Stats) bool { + stats.RLock() + defer stats.RUnlock() + if stats.inmemory == true { + allInLocal = false + return false + } + return true + }) + + return allInLocal + }, time.Minute, time.Millisecond*100) + + // release some segments + releasedSeg := []int64{1, 2, 3} + suite.updateSnapshot([]int64{}, []int64{}, releasedSeg) + suite.idfOracle.SetNext(suite.snapshot) + suite.waitTargetVersion(suite.targetVersion) + suite.Equal(int64(1), suite.idfOracle.current.NumRow()) +} + func TestIDFOracle(t *testing.T) { suite.Run(t, new(IDFOracleSuite)) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8ff6899857..a9c2d8e9a4 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2851,9 +2851,38 @@ type queryNodeConfig struct { // Json Key Stats JSONKeyStatsCommitInterval ParamItem `refreshable:"false"` EnabledGrowingSegmentJSONKeyStats ParamItem `refreshable:"false"` + + // Idf Oracle + IDFEnableDisk ParamItem `refreshable:"true"` + IDFLocalPath ParamItem `refreshable:"true"` + IDFWriteConcurrenct ParamItem `refreshable:"true"` } func (p *queryNodeConfig) init(base *BaseTable) { + p.IDFEnableDisk = ParamItem{ + Key: "queryNode.idfOracle.enableDisk", + Version: "2.6.0", + Export: true, + DefaultValue: "true", + } + p.IDFEnableDisk.Init(base.mgr) + + p.IDFLocalPath = ParamItem{ + Key: "queryNode.idfOracle.localPath", + Version: "2.6.0", + Export: true, + DefaultValue: "/var/lib/milvus/bm25_logs", + } + p.IDFLocalPath.Init(base.mgr) + + p.IDFWriteConcurrenct = ParamItem{ + Key: "queryNode.idfOracle.writeConcurrency", + Version: "2.6.0", + Export: true, + DefaultValue: "4", + } + p.IDFWriteConcurrenct.Init(base.mgr) + p.SoPath = ParamItem{ Key: "queryNode.soPath", Version: "2.3.0",