mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: optimize idf oracle sync logic (#44628)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
a3a28a4b99
commit
ac82bad0b3
@ -489,9 +489,12 @@ func (o *idfOracle) SyncDistribution() error {
|
|||||||
|
|
||||||
sealed, _ := snapshot.Peek()
|
sealed, _ := snapshot.Peek()
|
||||||
|
|
||||||
sealedMap := map[int64]bool{} // sealed diff map, activate segment stats if true, and remove if not in map
|
// intarget segment map
|
||||||
|
targetMap := typeutil.NewSet[UniqueID]()
|
||||||
|
// segment with unreadable target version was not been used,
|
||||||
|
// not remove them till it update version or remove from snapshot(released)
|
||||||
|
reserveMap := typeutil.NewSet[UniqueID]()
|
||||||
|
|
||||||
// only remain current target segment and unknown version segment in snapshot.
|
|
||||||
for _, item := range sealed {
|
for _, item := range sealed {
|
||||||
for _, segment := range item.Segments {
|
for _, segment := range item.Segments {
|
||||||
if segment.Level == datapb.SegmentLevel_L0 {
|
if segment.Level == datapb.SegmentLevel_L0 {
|
||||||
@ -500,12 +503,12 @@ func (o *idfOracle) SyncDistribution() error {
|
|||||||
|
|
||||||
switch segment.TargetVersion {
|
switch segment.TargetVersion {
|
||||||
case snapshot.targetVersion:
|
case snapshot.targetVersion:
|
||||||
sealedMap[segment.SegmentID] = true
|
targetMap.Insert(segment.SegmentID)
|
||||||
if !o.sealed.Contain(segment.SegmentID) {
|
if !o.sealed.Contain(segment.SegmentID) {
|
||||||
log.Warn("idf oracle lack some sealed segment", zap.Int64("segment", segment.SegmentID))
|
log.Warn("idf oracle lack some sealed segment", zap.Int64("segment", segment.SegmentID))
|
||||||
}
|
}
|
||||||
case unreadableTargetVersion:
|
case unreadableTargetVersion:
|
||||||
sealedMap[segment.SegmentID] = false
|
reserveMap.Insert(segment.SegmentID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -514,11 +517,11 @@ func (o *idfOracle) SyncDistribution() error {
|
|||||||
|
|
||||||
var rangeErr error
|
var rangeErr error
|
||||||
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
||||||
// segment was unreadable if in snapshot but not in target.
|
intarget := targetMap.Contain(segmentID)
|
||||||
intarget, insnap := sealedMap[segmentID]
|
|
||||||
activate := stats.activate.Load()
|
activate := stats.activate.Load()
|
||||||
// activate segment if segment in target
|
// activate segment if segment in target
|
||||||
if insnap && intarget && !activate {
|
if intarget && !activate {
|
||||||
stats, err := stats.FetchStats()
|
stats, err := stats.FetchStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
|
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
|
||||||
@ -526,9 +529,8 @@ func (o *idfOracle) SyncDistribution() error {
|
|||||||
}
|
}
|
||||||
diff.Merge(stats)
|
diff.Merge(stats)
|
||||||
} else
|
} else
|
||||||
// deactivate segment if segment not in snapshot
|
// deactivate segment if segment not in target.
|
||||||
// or deactivate segment if segment unreadable (only exist at preload segment)
|
if !intarget && activate {
|
||||||
if (!insnap || (insnap && !intarget)) && activate {
|
|
||||||
stats, err := stats.FetchStats()
|
stats, err := stats.FetchStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
|
rangeErr = fmt.Errorf("fetch stats failed with error: %v", err)
|
||||||
@ -559,24 +561,29 @@ func (o *idfOracle) SyncDistribution() error {
|
|||||||
|
|
||||||
// remove sealed segment not in target
|
// remove sealed segment not in target
|
||||||
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
o.sealed.Range(func(segmentID int64, stats *sealedBm25Stats) bool {
|
||||||
intarget, insnap := sealedMap[segmentID]
|
reserve := reserveMap.Contain(segmentID)
|
||||||
activate := stats.activate.Load()
|
intarget := targetMap.Contain(segmentID)
|
||||||
// remove if segment not in snapshot
|
|
||||||
// and add before snapshot
|
|
||||||
if !insnap && stats.ts.Before(snapshotTs) {
|
|
||||||
stats.Remove()
|
|
||||||
o.sealed.Remove(segmentID)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
activate := stats.activate.Load()
|
||||||
// save activate if segment in target.
|
// save activate if segment in target.
|
||||||
if insnap && intarget && !activate {
|
if intarget && !activate {
|
||||||
stats.activate.Store(true)
|
stats.activate.Store(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deactivate if segment unreadable.
|
// deactivate if segment not in target.
|
||||||
if insnap && !intarget && activate {
|
if !intarget && activate {
|
||||||
stats.activate.Store(false)
|
stats.activate.Store(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove
|
||||||
|
// if segment not in target and not in reserve list
|
||||||
|
// (means segment target version was old version or segment not in snapshot)
|
||||||
|
// and add before snapshot Ts
|
||||||
|
// (forbid remove some new segment register after current snapshot)
|
||||||
|
if !intarget && !reserve && stats.ts.Before(snapshotTs) {
|
||||||
|
stats.Remove()
|
||||||
|
o.sealed.Remove(segmentID)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user