mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
enhance: add segment bm25 stats local cache (#41775)
relate: https://github.com/milvus-io/milvus/issues/41424 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
b6723296b2
commit
0fafb706ba
@ -521,6 +521,10 @@ queryNode:
|
||||
bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM
|
||||
workerPooling:
|
||||
size: 10 # the size for worker querynode client pool
|
||||
idfOracle:
|
||||
enableDisk: true
|
||||
localPath: /var/lib/milvus/bm25_logs
|
||||
writeConcurrency: 4
|
||||
ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address
|
||||
port: 21123 # TCP port of queryNode
|
||||
grpc:
|
||||
|
||||
@ -946,6 +946,11 @@ func (sd *shardDelegator) Close() {
|
||||
sd.tsCond.Broadcast()
|
||||
sd.lifetime.Wait()
|
||||
|
||||
// clean idf oracle
|
||||
if sd.idfOracle != nil {
|
||||
sd.idfOracle.Close()
|
||||
}
|
||||
|
||||
// clean up l0 segment in delete buffer
|
||||
start := time.Now()
|
||||
sd.deleteBuffer.Clear()
|
||||
@ -1066,8 +1071,9 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
||||
}
|
||||
|
||||
if len(sd.isBM25Field) > 0 {
|
||||
sd.idfOracle = NewIDFOracle(collection.Schema().GetFunctions())
|
||||
sd.idfOracle = NewIDFOracle(sd.collectionID, collection.Schema().GetFunctions())
|
||||
sd.distribution.SetIDFOracle(sd.idfOracle)
|
||||
sd.idfOracle.Start()
|
||||
}
|
||||
|
||||
m := sync.Mutex{}
|
||||
|
||||
@ -933,11 +933,6 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
||||
pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
|
||||
pkoracle.WithSegmentType(commonpb.SegmentState_Growing),
|
||||
)
|
||||
if sd.idfOracle != nil {
|
||||
for _, segment := range growing {
|
||||
sd.idfOracle.RemoveGrowing(segment.SegmentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var releaseErr error
|
||||
|
||||
@ -945,6 +945,15 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DelegatorDataSuite) waitTargetVersion(targetVersion int64) {
|
||||
for {
|
||||
if s.delegator.idfOracle.TargetVersion() >= targetVersion {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelegatorDataSuite) TestBuildBM25IDF() {
|
||||
s.genCollectionWithFunction()
|
||||
|
||||
@ -1004,7 +1013,8 @@ func (s *DelegatorDataSuite) TestBuildBM25IDF() {
|
||||
}
|
||||
snapshot := genSnapShot([]int64{1, 2, 3, 4}, []int64{}, 100)
|
||||
|
||||
s.delegator.idfOracle.SyncDistribution(snapshot)
|
||||
s.delegator.idfOracle.SetNext(snapshot)
|
||||
s.waitTargetVersion(snapshot.targetVersion)
|
||||
placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(genStringFieldData("test bm25 data"))
|
||||
s.NoError(err)
|
||||
|
||||
@ -1160,7 +1170,8 @@ func (s *DelegatorDataSuite) TestBuildBM25IDF() {
|
||||
}
|
||||
snapshot := genSnapShot([]int64{1, 2, 3, 4}, []int64{}, 100)
|
||||
|
||||
s.delegator.idfOracle.SyncDistribution(snapshot)
|
||||
s.delegator.idfOracle.SetNext(snapshot)
|
||||
s.waitTargetVersion(snapshot.targetVersion)
|
||||
placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(genStringFieldData("test bm25 data"))
|
||||
s.NoError(err)
|
||||
|
||||
|
||||
@ -350,6 +350,11 @@ func (d *distribution) SyncTargetVersion(newVersion int64, partitions []int64, g
|
||||
|
||||
// update working partition list
|
||||
d.genSnapshot()
|
||||
|
||||
if d.idfOracle != nil {
|
||||
d.idfOracle.SetNext(d.current.Load())
|
||||
d.idfOracle.LazyRemoveGrowings(newVersion, redundantGrowings...)
|
||||
}
|
||||
// if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable
|
||||
d.updateServiceable("SyncTargetVersion")
|
||||
|
||||
@ -450,9 +455,6 @@ func (d *distribution) genSnapshot() chan struct{} {
|
||||
d.current.Store(newSnapShot)
|
||||
// shall be a new one
|
||||
d.snapshots.GetOrInsert(d.snapshotVersion, newSnapShot)
|
||||
if d.idfOracle != nil {
|
||||
d.idfOracle.SyncDistribution(newSnapShot)
|
||||
}
|
||||
|
||||
// first snapshot, return closed chan
|
||||
if last == nil {
|
||||
|
||||
@ -18,9 +18,15 @@ package delegator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -29,120 +35,284 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
type IDFOracle interface {
|
||||
// Activate(segmentID int64, state commonpb.SegmentState) error
|
||||
// Deactivate(segmentID int64, state commonpb.SegmentState) error
|
||||
SetNext(snapshot *snapshot)
|
||||
TargetVersion() int64
|
||||
|
||||
SyncDistribution(snapshot *snapshot)
|
||||
UpdateGrowing(segmentID int64, stats bm25Stats)
|
||||
// mark growing segment remove target version
|
||||
LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64)
|
||||
|
||||
UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25Stats)
|
||||
|
||||
Register(segmentID int64, stats map[int64]*storage.BM25Stats, state commonpb.SegmentState)
|
||||
RemoveGrowing(segmentID int64)
|
||||
Register(segmentID int64, stats bm25Stats, state commonpb.SegmentState)
|
||||
|
||||
BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error)
|
||||
|
||||
Start()
|
||||
Close()
|
||||
}
|
||||
|
||||
type bm25Stats struct {
|
||||
stats map[int64]*storage.BM25Stats
|
||||
activate bool
|
||||
targetVersion int64
|
||||
snapVersion int64
|
||||
}
|
||||
type bm25Stats map[int64]*storage.BM25Stats
|
||||
|
||||
func (s *bm25Stats) Merge(stats map[int64]*storage.BM25Stats) {
|
||||
func (s bm25Stats) Merge(stats bm25Stats) {
|
||||
for fieldID, newstats := range stats {
|
||||
if stats, ok := s.stats[fieldID]; ok {
|
||||
if stats, ok := s[fieldID]; ok {
|
||||
stats.Merge(newstats)
|
||||
} else {
|
||||
s.stats[fieldID] = storage.NewBM25Stats()
|
||||
s.stats[fieldID].Merge(newstats)
|
||||
s[fieldID] = storage.NewBM25Stats()
|
||||
s[fieldID].Merge(newstats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *bm25Stats) Minus(stats map[int64]*storage.BM25Stats) {
|
||||
func (s bm25Stats) Minus(stats bm25Stats) {
|
||||
for fieldID, newstats := range stats {
|
||||
if stats, ok := s.stats[fieldID]; ok {
|
||||
if stats, ok := s[fieldID]; ok {
|
||||
stats.Minus(newstats)
|
||||
} else {
|
||||
log.Panic("minus failed, BM25 stats not exist", zap.Int64("fieldID", fieldID))
|
||||
s[fieldID] = storage.NewBM25Stats()
|
||||
s[fieldID].Minus(newstats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *bm25Stats) GetStats(fieldID int64) (*storage.BM25Stats, error) {
|
||||
stats, ok := s.stats[fieldID]
|
||||
func (s bm25Stats) GetStats(fieldID int64) (*storage.BM25Stats, error) {
|
||||
stats, ok := s[fieldID]
|
||||
if !ok {
|
||||
return nil, errors.New("field not found in idf oracle BM25 stats")
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (s *bm25Stats) NumRow() int64 {
|
||||
for _, stats := range s.stats {
|
||||
func (s bm25Stats) NumRow() int64 {
|
||||
for _, stats := range s {
|
||||
return stats.NumRow()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func newBm25Stats(functions []*schemapb.FunctionSchema) *bm25Stats {
|
||||
stats := &bm25Stats{
|
||||
stats: make(map[int64]*storage.BM25Stats),
|
||||
type sealedBm25Stats struct {
|
||||
sync.RWMutex // Protect all data in struct except activate
|
||||
bm25Stats
|
||||
|
||||
activate *atomic.Bool
|
||||
|
||||
inmemory bool
|
||||
removed bool
|
||||
segmentID int64
|
||||
ts time.Time // Time of segemnt register, all segment resgister after target generate will don't remove
|
||||
localPath string
|
||||
}
|
||||
|
||||
func (s *sealedBm25Stats) writeFile(localPath string) (error, bool) {
|
||||
s.RLock()
|
||||
|
||||
if s.removed {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
m := make(map[int64][]byte, len(s.bm25Stats))
|
||||
stats := s.bm25Stats
|
||||
s.RUnlock()
|
||||
|
||||
// RUnlock when stats serialize and write to file
|
||||
// to avoid block remove stats too long when sync distribution
|
||||
for fieldID, stats := range stats {
|
||||
bytes, err := stats.Serialize()
|
||||
if err != nil {
|
||||
return err, false
|
||||
}
|
||||
|
||||
m[fieldID] = bytes
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err, false
|
||||
}
|
||||
|
||||
err = os.WriteFile(localPath, b, 0o600)
|
||||
if err != nil {
|
||||
return err, false
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (s *sealedBm25Stats) ShouldOffLoadToDisk() bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.inmemory && s.activate.Load() && !s.removed
|
||||
}
|
||||
|
||||
// After merged the stats of a segment into the overall stats, Delegator still need to store the segment stats,
|
||||
// so that later when the segment is removed from target, we can Minus its stats. To reduce memory usage,
|
||||
// idfOracle store such per segment stats to disk, and load them when removing the segment.
|
||||
func (s *sealedBm25Stats) ToLocal(dirPath string) error {
|
||||
localpath := path.Join(dirPath, fmt.Sprintf("%d.data", s.segmentID))
|
||||
|
||||
if err, skip := s.writeFile(localpath); err != nil || skip {
|
||||
return err
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.inmemory = false
|
||||
s.bm25Stats = nil
|
||||
s.localPath = localpath
|
||||
|
||||
if s.removed {
|
||||
err := os.Remove(s.localPath)
|
||||
if err != nil {
|
||||
log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localPath))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sealedBm25Stats) Remove() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.removed = true
|
||||
|
||||
if !s.inmemory {
|
||||
err := os.Remove(s.localPath)
|
||||
if err != nil {
|
||||
log.Warn("remove local bm25 stats failed", zap.Error(err), zap.String("path", s.localPath))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch sealed bm25 stats
|
||||
// load local file and return it when stats not in memeory
|
||||
func (s *sealedBm25Stats) FetchStats() (map[int64]*storage.BM25Stats, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
if s.inmemory {
|
||||
return s.bm25Stats, nil
|
||||
}
|
||||
|
||||
b, err := os.ReadFile(s.localPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := make(map[int64][]byte)
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats := make(map[int64]*storage.BM25Stats)
|
||||
for fieldID, bytes := range m {
|
||||
stats[fieldID] = storage.NewBM25Stats()
|
||||
err = stats[fieldID].Deserialize(bytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
type growingBm25Stats struct {
|
||||
bm25Stats
|
||||
|
||||
activate bool
|
||||
droppedVersion int64
|
||||
}
|
||||
|
||||
func newBm25Stats(functions []*schemapb.FunctionSchema) bm25Stats {
|
||||
stats := make(map[int64]*storage.BM25Stats)
|
||||
|
||||
for _, function := range functions {
|
||||
if function.GetType() == schemapb.FunctionType_BM25 {
|
||||
stats.stats[function.GetOutputFieldIds()[0]] = storage.NewBM25Stats()
|
||||
stats[function.GetOutputFieldIds()[0]] = storage.NewBM25Stats()
|
||||
}
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
type idfOracle struct {
|
||||
type idfTarget struct {
|
||||
sync.RWMutex
|
||||
|
||||
current *bm25Stats
|
||||
|
||||
growing map[int64]*bm25Stats
|
||||
sealed map[int64]*bm25Stats
|
||||
|
||||
targetVersion int64
|
||||
snapshot *snapshot
|
||||
ts time.Time // time of target generate
|
||||
}
|
||||
|
||||
func (o *idfOracle) Register(segmentID int64, stats map[int64]*storage.BM25Stats, state commonpb.SegmentState) {
|
||||
o.Lock()
|
||||
defer o.Unlock()
|
||||
func (t *idfTarget) SetSnapshot(snapshot *snapshot) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.snapshot = snapshot
|
||||
t.ts = time.Now()
|
||||
}
|
||||
|
||||
func (t *idfTarget) GetSnapshot() (*snapshot, time.Time) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
return t.snapshot, t.ts
|
||||
}
|
||||
|
||||
type idfOracle struct {
|
||||
sync.RWMutex // protect current and growing segment stats
|
||||
current bm25Stats
|
||||
growing map[int64]*growingBm25Stats
|
||||
|
||||
sealed typeutil.ConcurrentMap[int64, *sealedBm25Stats]
|
||||
|
||||
collectionID int64
|
||||
|
||||
// for sync distribution
|
||||
next idfTarget
|
||||
targetVersion *atomic.Int64
|
||||
syncNotify chan struct{}
|
||||
|
||||
// for disk cache
|
||||
localNotify chan struct{}
|
||||
dirPath string
|
||||
|
||||
closeCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (o *idfOracle) TargetVersion() int64 {
|
||||
return o.targetVersion.Load()
|
||||
}
|
||||
|
||||
func (o *idfOracle) Register(segmentID int64, stats bm25Stats, state commonpb.SegmentState) {
|
||||
switch state {
|
||||
case segments.SegmentTypeGrowing:
|
||||
o.Lock()
|
||||
defer o.Unlock()
|
||||
|
||||
if _, ok := o.growing[segmentID]; ok {
|
||||
return
|
||||
}
|
||||
o.growing[segmentID] = &bm25Stats{
|
||||
stats: stats,
|
||||
activate: true,
|
||||
targetVersion: initialTargetVersion,
|
||||
o.growing[segmentID] = &growingBm25Stats{
|
||||
bm25Stats: stats,
|
||||
activate: true,
|
||||
}
|
||||
o.current.Merge(stats)
|
||||
case segments.SegmentTypeSealed:
|
||||
if _, ok := o.sealed[segmentID]; ok {
|
||||
if ok := o.sealed.Contain(segmentID); ok {
|
||||
return
|
||||
}
|
||||
o.sealed[segmentID] = &bm25Stats{
|
||||
stats: stats,
|
||||
activate: false,
|
||||
targetVersion: unreadableTargetVersion,
|
||||
}
|
||||
o.sealed.Insert(segmentID, &sealedBm25Stats{
|
||||
bm25Stats: stats,
|
||||
ts: time.Now(),
|
||||
activate: atomic.NewBool(false),
|
||||
inmemory: true,
|
||||
segmentID: segmentID,
|
||||
})
|
||||
default:
|
||||
log.Warn("register segment with unknown state", zap.String("stats", state.String()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25Stats) {
|
||||
func (o *idfOracle) UpdateGrowing(segmentID int64, stats bm25Stats) {
|
||||
if len(stats) == 0 {
|
||||
return
|
||||
}
|
||||
@ -161,33 +331,129 @@ func (o *idfOracle) UpdateGrowing(segmentID int64, stats map[int64]*storage.BM25
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) RemoveGrowing(segmentID int64) {
|
||||
func (o *idfOracle) LazyRemoveGrowings(targetVersion int64, segmentIDs ...int64) {
|
||||
o.Lock()
|
||||
defer o.Unlock()
|
||||
|
||||
if stats, ok := o.growing[segmentID]; ok {
|
||||
if stats.activate {
|
||||
o.current.Minus(stats.stats)
|
||||
for _, segmentID := range segmentIDs {
|
||||
if stats, ok := o.growing[segmentID]; ok && stats.droppedVersion == 0 {
|
||||
stats.droppedVersion = targetVersion
|
||||
}
|
||||
delete(o.growing, segmentID)
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) activate(stats *bm25Stats) {
|
||||
stats.activate = true
|
||||
o.current.Merge(stats.stats)
|
||||
func (o *idfOracle) Start() {
|
||||
o.wg.Add(1)
|
||||
go o.syncloop()
|
||||
|
||||
if paramtable.Get().QueryNodeCfg.IDFEnableDisk.GetAsBool() {
|
||||
o.wg.Add(1)
|
||||
go o.localloop()
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) deactivate(stats *bm25Stats) {
|
||||
stats.activate = false
|
||||
o.current.Minus(stats.stats)
|
||||
func (o *idfOracle) Close() {
|
||||
close(o.closeCh)
|
||||
o.wg.Wait()
|
||||
|
||||
os.RemoveAll(o.dirPath)
|
||||
}
|
||||
|
||||
func (o *idfOracle) SyncDistribution(snapshot *snapshot) {
|
||||
o.Lock()
|
||||
defer o.Unlock()
|
||||
func (o *idfOracle) SetNext(snapshot *snapshot) {
|
||||
o.next.SetSnapshot(snapshot)
|
||||
|
||||
sealed, growing := snapshot.Peek()
|
||||
// sync SyncDistibution when first load target
|
||||
if o.targetVersion.Load() == 0 {
|
||||
o.SyncDistribution()
|
||||
} else {
|
||||
o.NotifySync()
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) NotifySync() {
|
||||
select {
|
||||
case o.syncNotify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) NotifyLocal() {
|
||||
select {
|
||||
case o.localNotify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) syncloop() {
|
||||
defer o.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-o.syncNotify:
|
||||
err := o.SyncDistribution()
|
||||
if err != nil {
|
||||
log.Warn("idf oracle sync distribution failed", zap.Error(err))
|
||||
time.Sleep(time.Second * 10)
|
||||
o.NotifySync()
|
||||
}
|
||||
case <-o.closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *idfOracle) localloop() {
|
||||
pool := conc.NewPool[struct{}](paramtable.Get().QueryNodeCfg.IDFWriteConcurrenct.GetAsInt())
|
||||
o.dirPath = path.Join(paramtable.Get().QueryNodeCfg.IDFLocalPath.GetValue(), fmt.Sprintf("%d", o.collectionID))
|
||||
|
||||
defer o.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-o.localNotify:
|
||||
statsList := []*sealedBm25Stats{}
|
||||
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
||||
if stats.ShouldOffLoadToDisk() {
|
||||
statsList = append(statsList, stats)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if _, err := os.Stat(o.dirPath); os.IsNotExist(err) {
|
||||
err = os.MkdirAll(o.dirPath, 0o755)
|
||||
if err != nil {
|
||||
log.Warn("create idf local path failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
features := []*conc.Future[struct{}]{}
|
||||
for _, stats := range statsList {
|
||||
features = append(features, pool.Submit(func() (struct{}, error) {
|
||||
err := stats.ToLocal(o.dirPath)
|
||||
if err != nil {
|
||||
log.Warn("idf oracle to local failed", zap.Error(err))
|
||||
return struct{}{}, nil
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
}
|
||||
|
||||
conc.AwaitAll(features...)
|
||||
case <-o.closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WARN: SyncDistribution not concurrent safe.
|
||||
func (o *idfOracle) SyncDistribution() error {
|
||||
snapshot, snapshotTs := o.next.GetSnapshot()
|
||||
if snapshot.targetVersion <= o.targetVersion.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
sealed, _ := snapshot.Peek()
|
||||
|
||||
sealedMap := map[int64]bool{} // sealed diff map, activate segment stats if true, and remove if not in map
|
||||
|
||||
for _, item := range sealed {
|
||||
for _, segment := range item.Segments {
|
||||
@ -195,47 +461,75 @@ func (o *idfOracle) SyncDistribution(snapshot *snapshot) {
|
||||
continue
|
||||
}
|
||||
|
||||
if stats, ok := o.sealed[segment.SegmentID]; ok {
|
||||
if stats.targetVersion < segment.TargetVersion {
|
||||
stats.targetVersion = segment.TargetVersion
|
||||
}
|
||||
stats.snapVersion = snapshot.version
|
||||
} else {
|
||||
log.Warn("idf oracle lack some sealed segment", zap.Int64("segmentID", segment.SegmentID))
|
||||
if segment.TargetVersion == snapshot.targetVersion {
|
||||
sealedMap[segment.SegmentID] = true
|
||||
} else if segment.TargetVersion == unreadableTargetVersion {
|
||||
sealedMap[segment.SegmentID] = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, segment := range growing {
|
||||
if stats, ok := o.growing[segment.SegmentID]; ok {
|
||||
stats.targetVersion = segment.TargetVersion
|
||||
} else {
|
||||
log.Warn("idf oracle lack some growing segment", zap.Int64("segmentID", segment.SegmentID))
|
||||
diff := bm25Stats{}
|
||||
|
||||
var rangeErr error
|
||||
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
||||
activate, ok := sealedMap[segmentID]
|
||||
statsActivate := stats.activate.Load()
|
||||
if ok && activate && !statsActivate {
|
||||
stats, err := stats.FetchStats()
|
||||
if err != nil {
|
||||
rangeErr = err
|
||||
return false
|
||||
}
|
||||
diff.Merge(stats)
|
||||
} else if !ok && statsActivate {
|
||||
stats, err := stats.FetchStats()
|
||||
if err != nil {
|
||||
rangeErr = err
|
||||
return false
|
||||
}
|
||||
diff.Minus(stats)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if rangeErr != nil {
|
||||
return rangeErr
|
||||
}
|
||||
|
||||
o.targetVersion = snapshot.targetVersion
|
||||
o.Lock()
|
||||
defer o.Unlock()
|
||||
|
||||
for segmentID, stats := range o.sealed {
|
||||
if !stats.activate && stats.targetVersion == o.targetVersion {
|
||||
o.activate(stats)
|
||||
} else if (stats.targetVersion < o.targetVersion && stats.targetVersion != unreadableTargetVersion) || stats.snapVersion != snapshot.version {
|
||||
for segmentID, stats := range o.growing {
|
||||
// drop growing segment bm25 stats
|
||||
if stats.droppedVersion != 0 && stats.droppedVersion <= snapshot.targetVersion {
|
||||
if stats.activate {
|
||||
o.current.Minus(stats.stats)
|
||||
o.current.Minus(stats.bm25Stats)
|
||||
}
|
||||
delete(o.sealed, segmentID)
|
||||
delete(o.growing, segmentID)
|
||||
}
|
||||
}
|
||||
o.current.Merge(diff)
|
||||
|
||||
for _, stats := range o.growing {
|
||||
if !stats.activate && (stats.targetVersion == o.targetVersion || stats.targetVersion == initialTargetVersion) {
|
||||
o.activate(stats)
|
||||
} else if stats.activate && (stats.targetVersion != o.targetVersion && stats.targetVersion != initialTargetVersion) {
|
||||
o.deactivate(stats)
|
||||
// remove sealed segment not in target
|
||||
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
||||
activate, ok := sealedMap[segmentID]
|
||||
statsActivate := stats.activate.Load()
|
||||
if !ok && stats.ts.Before(snapshotTs) {
|
||||
stats.Remove()
|
||||
o.sealed.Remove(segmentID)
|
||||
}
|
||||
}
|
||||
|
||||
log.Ctx(context.TODO()).Debug("sync distribution finished", zap.Int64("version", o.targetVersion), zap.Int64("numrow", o.current.NumRow()))
|
||||
if ok && activate && !statsActivate {
|
||||
stats.activate.Store(true)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
o.targetVersion.Store(snapshot.targetVersion)
|
||||
o.NotifyLocal()
|
||||
log.Ctx(context.TODO()).Info("sync distribution finished", zap.Int64("version", snapshot.targetVersion), zap.Int64("numrow", o.current.NumRow()), zap.Int("growing", len(o.growing)), zap.Int("sealed", o.sealed.Len()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *idfOracle) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][]byte, float64, error) {
|
||||
@ -255,10 +549,15 @@ func (o *idfOracle) BuildIDF(fieldID int64, tfs *schemapb.SparseFloatArray) ([][
|
||||
return idfBytes, stats.GetAvgdl(), nil
|
||||
}
|
||||
|
||||
func NewIDFOracle(functions []*schemapb.FunctionSchema) IDFOracle {
|
||||
func NewIDFOracle(collID int64, functions []*schemapb.FunctionSchema) IDFOracle {
|
||||
return &idfOracle{
|
||||
current: newBm25Stats(functions),
|
||||
growing: make(map[int64]*bm25Stats),
|
||||
sealed: make(map[int64]*bm25Stats),
|
||||
collectionID: collID,
|
||||
targetVersion: atomic.NewInt64(0),
|
||||
current: newBm25Stats(functions),
|
||||
growing: make(map[int64]*growingBm25Stats),
|
||||
sealed: typeutil.ConcurrentMap[int64, *sealedBm25Stats]{},
|
||||
syncNotify: make(chan struct{}, 1),
|
||||
closeCh: make(chan struct{}),
|
||||
localNotify: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package delegator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
@ -49,13 +50,27 @@ func (suite *IDFOracleSuite) SetupSuite() {
|
||||
}
|
||||
|
||||
func (suite *IDFOracleSuite) SetupTest() {
|
||||
suite.idfOracle = NewIDFOracle(suite.collectionSchema.GetFunctions()).(*idfOracle)
|
||||
suite.idfOracle = NewIDFOracle(suite.collectionID, suite.collectionSchema.GetFunctions()).(*idfOracle)
|
||||
suite.idfOracle.Start()
|
||||
suite.snapshot = &snapshot{
|
||||
dist: []SnapshotItem{{1, make([]SegmentEntry, 0)}},
|
||||
}
|
||||
suite.targetVersion = 0
|
||||
}
|
||||
|
||||
func (suite *IDFOracleSuite) TearDownTest() {
|
||||
suite.idfOracle.Close()
|
||||
}
|
||||
|
||||
func (s *IDFOracleSuite) waitTargetVersion(targetVersion int64) {
|
||||
for {
|
||||
if s.idfOracle.TargetVersion() >= targetVersion {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *IDFOracleSuite) genStats(start uint32, end uint32) map[int64]*storage.BM25Stats {
|
||||
result := make(map[int64]*storage.BM25Stats)
|
||||
result[102] = storage.NewBM25Stats()
|
||||
@ -124,12 +139,14 @@ func (suite *IDFOracleSuite) TestSealed() {
|
||||
|
||||
// update and sync snapshot make all sealed activate
|
||||
suite.updateSnapshot(sealedSegs, []int64{}, []int64{})
|
||||
suite.idfOracle.SyncDistribution(suite.snapshot)
|
||||
suite.idfOracle.SetNext(suite.snapshot)
|
||||
suite.waitTargetVersion(suite.targetVersion)
|
||||
suite.Equal(int64(4), suite.idfOracle.current.NumRow())
|
||||
|
||||
releasedSeg := []int64{1, 2, 3}
|
||||
suite.updateSnapshot([]int64{}, []int64{}, releasedSeg)
|
||||
suite.idfOracle.SyncDistribution(suite.snapshot)
|
||||
suite.idfOracle.SetNext(suite.snapshot)
|
||||
suite.waitTargetVersion(suite.targetVersion)
|
||||
suite.Equal(int64(1), suite.idfOracle.current.NumRow())
|
||||
|
||||
sparse := typeutil.CreateAndSortSparseFloatRow(map[uint32]float32{4: 1})
|
||||
@ -156,7 +173,9 @@ func (suite *IDFOracleSuite) TestGrow() {
|
||||
|
||||
releasedSeg := []int64{1, 2, 3}
|
||||
suite.updateSnapshot([]int64{}, []int64{}, releasedSeg)
|
||||
suite.idfOracle.SyncDistribution(suite.snapshot)
|
||||
suite.idfOracle.LazyRemoveGrowings(suite.snapshot.targetVersion, releasedSeg...)
|
||||
suite.idfOracle.SetNext(suite.snapshot)
|
||||
suite.waitTargetVersion(suite.targetVersion)
|
||||
suite.Equal(int64(1), suite.idfOracle.current.NumRow())
|
||||
|
||||
suite.idfOracle.UpdateGrowing(4, suite.genStats(5, 6))
|
||||
@ -170,14 +189,6 @@ func (suite *IDFOracleSuite) TestStats() {
|
||||
OutputFieldIds: []int64{102},
|
||||
}})
|
||||
|
||||
suite.NotPanics(func() {
|
||||
stats.Merge(map[int64]*storage.BM25Stats{103: storage.NewBM25Stats()})
|
||||
})
|
||||
|
||||
suite.Panics(func() {
|
||||
stats.Minus(map[int64]*storage.BM25Stats{104: storage.NewBM25Stats()})
|
||||
})
|
||||
|
||||
_, err := stats.GetStats(104)
|
||||
suite.Error(err)
|
||||
|
||||
@ -185,6 +196,42 @@ func (suite *IDFOracleSuite) TestStats() {
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *IDFOracleSuite) TestLocalCache() {
|
||||
// register sealed
|
||||
sealedSegs := []int64{1, 2, 3, 4}
|
||||
for _, segID := range sealedSegs {
|
||||
suite.idfOracle.Register(segID, suite.genStats(uint32(segID), uint32(segID)+1), commonpb.SegmentState_Sealed)
|
||||
}
|
||||
|
||||
// update and sync snapshot make all sealed activate
|
||||
suite.updateSnapshot(sealedSegs, []int64{}, []int64{})
|
||||
suite.idfOracle.SetNext(suite.snapshot)
|
||||
suite.waitTargetVersion(suite.targetVersion)
|
||||
suite.Equal(int64(4), suite.idfOracle.current.NumRow())
|
||||
|
||||
suite.Require().Eventually(func() bool {
|
||||
allInLocal := true
|
||||
suite.idfOracle.sealed.Range(func(id int64, stats *sealedBm25Stats) bool {
|
||||
stats.RLock()
|
||||
defer stats.RUnlock()
|
||||
if stats.inmemory == true {
|
||||
allInLocal = false
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
return allInLocal
|
||||
}, time.Minute, time.Millisecond*100)
|
||||
|
||||
// release some segments
|
||||
releasedSeg := []int64{1, 2, 3}
|
||||
suite.updateSnapshot([]int64{}, []int64{}, releasedSeg)
|
||||
suite.idfOracle.SetNext(suite.snapshot)
|
||||
suite.waitTargetVersion(suite.targetVersion)
|
||||
suite.Equal(int64(1), suite.idfOracle.current.NumRow())
|
||||
}
|
||||
|
||||
func TestIDFOracle(t *testing.T) {
|
||||
suite.Run(t, new(IDFOracleSuite))
|
||||
}
|
||||
|
||||
@ -2851,9 +2851,38 @@ type queryNodeConfig struct {
|
||||
// Json Key Stats
|
||||
JSONKeyStatsCommitInterval ParamItem `refreshable:"false"`
|
||||
EnabledGrowingSegmentJSONKeyStats ParamItem `refreshable:"false"`
|
||||
|
||||
// Idf Oracle
|
||||
IDFEnableDisk ParamItem `refreshable:"true"`
|
||||
IDFLocalPath ParamItem `refreshable:"true"`
|
||||
IDFWriteConcurrenct ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *queryNodeConfig) init(base *BaseTable) {
|
||||
p.IDFEnableDisk = ParamItem{
|
||||
Key: "queryNode.idfOracle.enableDisk",
|
||||
Version: "2.6.0",
|
||||
Export: true,
|
||||
DefaultValue: "true",
|
||||
}
|
||||
p.IDFEnableDisk.Init(base.mgr)
|
||||
|
||||
p.IDFLocalPath = ParamItem{
|
||||
Key: "queryNode.idfOracle.localPath",
|
||||
Version: "2.6.0",
|
||||
Export: true,
|
||||
DefaultValue: "/var/lib/milvus/bm25_logs",
|
||||
}
|
||||
p.IDFLocalPath.Init(base.mgr)
|
||||
|
||||
p.IDFWriteConcurrenct = ParamItem{
|
||||
Key: "queryNode.idfOracle.writeConcurrency",
|
||||
Version: "2.6.0",
|
||||
Export: true,
|
||||
DefaultValue: "4",
|
||||
}
|
||||
p.IDFWriteConcurrenct.Init(base.mgr)
|
||||
|
||||
p.SoPath = ParamItem{
|
||||
Key: "queryNode.soPath",
|
||||
Version: "2.3.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user