mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
issue: #43767 - Enhance serviceable check logic to properly handle full vs partial result requirements - For full result (requiredLoadRatio >= 1.0): check queryView.Serviceable() - For partial result (requiredLoadRatio < 1.0): check load ratio satisfaction - Add comprehensive unit tests covering all serviceable check scenarios This enhancement ensures delegator correctly validates serviceability based on the requested result completeness, improving reliability of query operations. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
588 lines
20 KiB
Go
588 lines
20 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package delegator
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/samber/lo"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
const (
|
|
// wildcardNodeID matches any nodeID, used for force distribution correction.
|
|
wildcardNodeID = int64(-1)
|
|
// for growing segment consumed from channel
|
|
initialTargetVersion = int64(0)
|
|
|
|
// for growing segment which not exist in target, and it's start position < max sealed dml position
|
|
redundantTargetVersion = int64(-1)
|
|
|
|
// for sealed segment which loaded by load segment request, should become readable after sync target version
|
|
unreadableTargetVersion = int64(-2)
|
|
)
|
|
|
|
var (
|
|
closedCh chan struct{}
|
|
closeOnce sync.Once
|
|
)
|
|
|
|
func getClosedCh() chan struct{} {
|
|
closeOnce.Do(func() {
|
|
closedCh = make(chan struct{})
|
|
close(closedCh)
|
|
})
|
|
return closedCh
|
|
}
|
|
|
|
// channelQueryView maintains the sealed segment list which should be used for search/query.
|
|
// for new delegator, will got a new channelQueryView from WatchChannel, and get the queryView update from querycoord before it becomes serviceable
|
|
// after delegator becomes serviceable, it only update the queryView by SyncTargetVersion
|
|
type channelQueryView struct {
|
|
growingSegments typeutil.UniqueSet // growing segment list which should be used for search/query
|
|
sealedSegmentRowCount map[int64]int64 // sealed segment list which should be used for search/query, segmentID -> row count
|
|
partitions typeutil.UniqueSet // partitions list which sealed segments belong to
|
|
version int64 // version of current query view, same as targetVersion in qc
|
|
|
|
loadedRatio *atomic.Float64 // loaded ratio of current query view, set serviceable to true if loadedRatio == 1.0
|
|
unloadedSealedSegments []SegmentEntry // workerID -> -1
|
|
|
|
syncedByCoord bool // if the query view is synced by coord
|
|
}
|
|
|
|
func NewChannelQueryView(growings []int64, sealedSegmentRowCount map[int64]int64, partitions []int64, version int64) *channelQueryView {
|
|
return &channelQueryView{
|
|
growingSegments: typeutil.NewUniqueSet(growings...),
|
|
sealedSegmentRowCount: sealedSegmentRowCount,
|
|
partitions: typeutil.NewUniqueSet(partitions...),
|
|
version: version,
|
|
loadedRatio: atomic.NewFloat64(0),
|
|
}
|
|
}
|
|
|
|
func (q *channelQueryView) GetVersion() int64 {
|
|
return q.version
|
|
}
|
|
|
|
func (q *channelQueryView) Serviceable() bool {
|
|
dataReady := q.loadedRatio.Load() >= 1.0
|
|
// for now, we only support collection level target(data view), so we need to wait for the query view is synced by coord
|
|
// incase of delegator become serviceable before current target is ready when memory is not enough.
|
|
// if current target is not ready, segment on delegator will be released at any time, serviceable state is not reliable.
|
|
// Note: after we support channel level target(data view), we can remove this flag
|
|
viewReady := q.syncedByCoord
|
|
|
|
// if partial result is enabled, we can skip the viewReady check
|
|
enablePartialResult := paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat() < 1.0
|
|
return dataReady && (viewReady || enablePartialResult)
|
|
}
|
|
|
|
func (q *channelQueryView) GetLoadedRatio() float64 {
|
|
return q.loadedRatio.Load()
|
|
}
|
|
|
|
// distribution is the struct to store segment distribution.
|
|
// it contains both growing and sealed segments.
|
|
type distribution struct {
|
|
// segments information
|
|
// map[SegmentID]=>segmentEntry
|
|
growingSegments map[UniqueID]SegmentEntry
|
|
sealedSegments map[UniqueID]SegmentEntry
|
|
|
|
// snapshotVersion indicator
|
|
snapshotVersion int64
|
|
snapshots *typeutil.ConcurrentMap[int64, *snapshot]
|
|
// current is the snapshot for quick usage for search/query
|
|
// generated for each change of distribution
|
|
current *atomic.Pointer[snapshot]
|
|
|
|
idfOracle IDFOracle
|
|
// protects current & segments
|
|
mut sync.RWMutex
|
|
|
|
// distribution info
|
|
channelName string
|
|
queryView *channelQueryView
|
|
}
|
|
|
|
// SegmentEntry stores the segment meta information.
|
|
type SegmentEntry struct {
|
|
NodeID int64
|
|
SegmentID UniqueID
|
|
PartitionID UniqueID
|
|
Version int64
|
|
TargetVersion int64
|
|
Level datapb.SegmentLevel
|
|
Offline bool // if delegator failed to execute forwardDelete/Query/Search on segment, it will be offline
|
|
}
|
|
|
|
func NewDistribution(channelName string, queryView *channelQueryView) *distribution {
|
|
dist := &distribution{
|
|
channelName: channelName,
|
|
growingSegments: make(map[UniqueID]SegmentEntry),
|
|
sealedSegments: make(map[UniqueID]SegmentEntry),
|
|
snapshots: typeutil.NewConcurrentMap[int64, *snapshot](),
|
|
current: atomic.NewPointer[snapshot](nil),
|
|
queryView: queryView,
|
|
}
|
|
dist.genSnapshot()
|
|
dist.updateServiceable("NewDistribution")
|
|
return dist
|
|
}
|
|
|
|
func (d *distribution) SetIDFOracle(idfOracle IDFOracle) {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
d.idfOracle = idfOracle
|
|
}
|
|
|
|
// return segment distribution in query view
|
|
func (d *distribution) PinReadableSegments(requiredLoadRatio float64, partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, sealedRowCount map[int64]int64, version int64, err error) {
|
|
d.mut.RLock()
|
|
defer d.mut.RUnlock()
|
|
|
|
requireFullResult := requiredLoadRatio >= 1.0
|
|
loadRatioSatisfy := d.queryView.GetLoadedRatio() >= requiredLoadRatio
|
|
var isServiceable bool
|
|
if requireFullResult {
|
|
isServiceable = d.queryView.Serviceable()
|
|
} else {
|
|
isServiceable = loadRatioSatisfy
|
|
}
|
|
|
|
if !isServiceable {
|
|
log.Warn("channel distribution is not serviceable",
|
|
zap.String("channel", d.channelName),
|
|
zap.Float64("requiredLoadRatio", requiredLoadRatio),
|
|
zap.Float64("currentLoadRatio", d.queryView.GetLoadedRatio()),
|
|
zap.Bool("serviceable", d.queryView.Serviceable()),
|
|
)
|
|
return nil, nil, nil, -1, merr.WrapErrChannelNotAvailable(d.channelName, "channel distribution is not serviceable")
|
|
}
|
|
|
|
current := d.current.Load()
|
|
// snapshot sanity check
|
|
// if user specified a partition id which is not serviceable, return err
|
|
for _, partition := range partitions {
|
|
if !current.partitions.Contain(partition) {
|
|
return nil, nil, nil, -1, merr.WrapErrPartitionNotLoaded(partition)
|
|
}
|
|
}
|
|
sealed, growing = current.Get(partitions...)
|
|
version = current.version
|
|
sealedRowCount = d.queryView.sealedSegmentRowCount
|
|
if d.queryView.GetLoadedRatio() == 1.0 {
|
|
// if query view is fully loaded, we can use current target version to filter segments
|
|
targetVersion := current.GetTargetVersion()
|
|
filterReadable := d.readableFilter(targetVersion)
|
|
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
|
|
} else {
|
|
// if query view is not fully loaded, we need to filter segments by query view's segment list to offer partial result
|
|
sealed = lo.Map(sealed, func(item SnapshotItem, _ int) SnapshotItem {
|
|
return SnapshotItem{
|
|
NodeID: item.NodeID,
|
|
Segments: lo.Filter(item.Segments, func(entry SegmentEntry, _ int) bool {
|
|
return d.queryView.sealedSegmentRowCount[entry.SegmentID] > 0
|
|
}),
|
|
}
|
|
})
|
|
growing = lo.Filter(growing, func(entry SegmentEntry, _ int) bool {
|
|
return d.queryView.growingSegments.Contain(entry.SegmentID)
|
|
})
|
|
}
|
|
|
|
if len(d.queryView.unloadedSealedSegments) > 0 {
|
|
// append distribution of unloaded segment
|
|
sealed = append(sealed, SnapshotItem{
|
|
NodeID: -1,
|
|
Segments: d.queryView.unloadedSealedSegments,
|
|
})
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (d *distribution) PinOnlineSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64) {
|
|
d.mut.RLock()
|
|
defer d.mut.RUnlock()
|
|
|
|
current := d.current.Load()
|
|
sealed, growing = current.Get(partitions...)
|
|
filterOnline := func(entry SegmentEntry, _ int) bool {
|
|
return !entry.Offline
|
|
}
|
|
sealed, growing = d.filterSegments(sealed, growing, filterOnline)
|
|
version = current.version
|
|
return
|
|
}
|
|
|
|
func (d *distribution) filterSegments(sealed []SnapshotItem, growing []SegmentEntry, filter func(SegmentEntry, int) bool) ([]SnapshotItem, []SegmentEntry) {
|
|
growing = lo.Filter(growing, filter)
|
|
sealed = lo.Map(sealed, func(item SnapshotItem, _ int) SnapshotItem {
|
|
return SnapshotItem{
|
|
NodeID: item.NodeID,
|
|
Segments: lo.Filter(item.Segments, filter),
|
|
}
|
|
})
|
|
|
|
return sealed, growing
|
|
}
|
|
|
|
// PeekAllSegments returns current snapshot without increasing inuse count
|
|
// show only used by GetDataDistribution.
|
|
func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry) {
|
|
current := d.current.Load()
|
|
sealed, growing = current.Peek(partitions...)
|
|
|
|
if readable {
|
|
targetVersion := current.GetTargetVersion()
|
|
filterReadable := d.readableFilter(targetVersion)
|
|
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Unpin notifies snapshot one reference is released.
|
|
func (d *distribution) Unpin(version int64) {
|
|
snapshot, ok := d.snapshots.Get(version)
|
|
if ok {
|
|
snapshot.Done(d.getCleanup(snapshot.version))
|
|
}
|
|
}
|
|
|
|
func (d *distribution) getTargetVersion() int64 {
|
|
current := d.current.Load()
|
|
return current.GetTargetVersion()
|
|
}
|
|
|
|
// Serviceable returns wether current snapshot is serviceable.
|
|
func (d *distribution) Serviceable() bool {
|
|
return d.queryView.Serviceable()
|
|
}
|
|
|
|
// for now, delegator become serviceable only when watchDmChannel is done
|
|
// so we regard all needed growing is loaded and we compute loadRatio based on sealed segments
|
|
func (d *distribution) updateServiceable(triggerAction string) {
|
|
loadedSealedSegments := int64(0)
|
|
totalSealedRowCount := int64(0)
|
|
unloadedSealedSegments := make([]SegmentEntry, 0)
|
|
for id, rowCount := range d.queryView.sealedSegmentRowCount {
|
|
if entry, ok := d.sealedSegments[id]; ok && !entry.Offline {
|
|
loadedSealedSegments += rowCount
|
|
} else {
|
|
unloadedSealedSegments = append(unloadedSealedSegments, SegmentEntry{SegmentID: id, NodeID: -1})
|
|
}
|
|
totalSealedRowCount += rowCount
|
|
}
|
|
|
|
// unloaded segment entry list for partial result
|
|
d.queryView.unloadedSealedSegments = unloadedSealedSegments
|
|
|
|
loadedRatio := 0.0
|
|
if len(d.queryView.sealedSegmentRowCount) == 0 {
|
|
loadedRatio = 1.0
|
|
} else if loadedSealedSegments == 0 {
|
|
loadedRatio = 0.0
|
|
} else {
|
|
loadedRatio = float64(loadedSealedSegments) / float64(totalSealedRowCount)
|
|
}
|
|
|
|
serviceable := loadedRatio >= 1.0
|
|
if serviceable != d.queryView.Serviceable() {
|
|
log.Info("channel distribution serviceable changed",
|
|
zap.String("channel", d.channelName),
|
|
zap.Bool("serviceable", serviceable),
|
|
zap.Float64("loadedRatio", loadedRatio),
|
|
zap.Int64("loadedSealedRowCount", loadedSealedSegments),
|
|
zap.Int64("totalSealedRowCount", totalSealedRowCount),
|
|
zap.Int("unloadedSealedSegmentNum", len(unloadedSealedSegments)),
|
|
zap.Int("totalSealedSegmentNum", len(d.queryView.sealedSegmentRowCount)),
|
|
zap.String("action", triggerAction))
|
|
}
|
|
|
|
d.queryView.loadedRatio.Store(loadedRatio)
|
|
}
|
|
|
|
// AddDistributions add multiple segment entries.
|
|
func (d *distribution) AddDistributions(entries ...SegmentEntry) {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
|
|
for _, entry := range entries {
|
|
oldEntry, ok := d.sealedSegments[entry.SegmentID]
|
|
if ok && oldEntry.Version >= entry.Version {
|
|
log.Warn("Invalid segment distribution changed, skip it",
|
|
zap.Int64("segmentID", entry.SegmentID),
|
|
zap.Int64("oldVersion", oldEntry.Version),
|
|
zap.Int64("oldNode", oldEntry.NodeID),
|
|
zap.Int64("newVersion", entry.Version),
|
|
zap.Int64("newNode", entry.NodeID),
|
|
)
|
|
continue
|
|
}
|
|
|
|
if ok {
|
|
// remain the target version for already loaded segment to void skipping this segment when executing search
|
|
entry.TargetVersion = oldEntry.TargetVersion
|
|
} else {
|
|
entry.TargetVersion = unreadableTargetVersion
|
|
}
|
|
d.sealedSegments[entry.SegmentID] = entry
|
|
}
|
|
|
|
d.genSnapshot()
|
|
d.updateServiceable("AddDistributions")
|
|
}
|
|
|
|
// AddGrowing adds growing segment distribution.
|
|
func (d *distribution) AddGrowing(entries ...SegmentEntry) {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
|
|
for _, entry := range entries {
|
|
d.growingSegments[entry.SegmentID] = entry
|
|
}
|
|
|
|
d.genSnapshot()
|
|
}
|
|
|
|
// AddOffline set segmentIDs to offlines.
|
|
func (d *distribution) MarkOfflineSegments(segmentIDs ...int64) {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
|
|
updated := false
|
|
for _, segmentID := range segmentIDs {
|
|
entry, ok := d.sealedSegments[segmentID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
updated = true
|
|
entry.Offline = true
|
|
entry.Version = unreadableTargetVersion
|
|
entry.NodeID = -1
|
|
d.sealedSegments[segmentID] = entry
|
|
}
|
|
|
|
if updated {
|
|
log.Info("mark sealed segment offline from distribution",
|
|
zap.String("channelName", d.channelName),
|
|
zap.Int64s("segmentIDs", segmentIDs))
|
|
d.genSnapshot()
|
|
d.updateServiceable("MarkOfflineSegments")
|
|
}
|
|
}
|
|
|
|
// update readable channel view
|
|
// 1. update readable channel view to support partial result before distribution is serviceable
|
|
// 2. update readable channel view to support full result after new distribution is serviceable
|
|
// Notice: if we don't need to be compatible with 2.5.x, we can just update new query view to support query,
|
|
// and new query view will become serviceable automatically, a sync action after distribution is serviceable is unnecessary
|
|
func (d *distribution) SyncTargetVersion(action *querypb.SyncAction, partitions []int64) {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
|
|
oldValue := d.queryView.version
|
|
d.queryView = &channelQueryView{
|
|
growingSegments: typeutil.NewUniqueSet(action.GetGrowingInTarget()...),
|
|
sealedSegmentRowCount: action.GetSealedSegmentRowCount(),
|
|
partitions: typeutil.NewUniqueSet(partitions...),
|
|
version: action.GetTargetVersion(),
|
|
loadedRatio: atomic.NewFloat64(0),
|
|
syncedByCoord: true,
|
|
}
|
|
|
|
sealedSet := typeutil.NewUniqueSet(action.GetSealedInTarget()...)
|
|
droppedSet := typeutil.NewUniqueSet(action.GetDroppedInTarget()...)
|
|
redundantGrowings := make([]int64, 0)
|
|
for _, s := range d.growingSegments {
|
|
// sealed segment already exists or dropped, make growing segment redundant
|
|
if sealedSet.Contain(s.SegmentID) || droppedSet.Contain(s.SegmentID) {
|
|
s.TargetVersion = redundantTargetVersion
|
|
log.Info("set growing segment redundant, wait for release",
|
|
zap.Int64("segmentID", s.SegmentID),
|
|
zap.Int64("targetVersion", s.TargetVersion),
|
|
)
|
|
d.growingSegments[s.SegmentID] = s
|
|
redundantGrowings = append(redundantGrowings, s.SegmentID)
|
|
}
|
|
}
|
|
|
|
d.queryView.growingSegments.Range(func(s UniqueID) bool {
|
|
entry, ok := d.growingSegments[s]
|
|
if !ok {
|
|
log.Warn("readable growing segment lost, consume from dml seems too slow",
|
|
zap.Int64("segmentID", s))
|
|
return true
|
|
}
|
|
entry.TargetVersion = action.GetTargetVersion()
|
|
d.growingSegments[s] = entry
|
|
return true
|
|
})
|
|
|
|
for id := range d.queryView.sealedSegmentRowCount {
|
|
entry, ok := d.sealedSegments[id]
|
|
if !ok {
|
|
continue
|
|
}
|
|
entry.TargetVersion = action.GetTargetVersion()
|
|
d.sealedSegments[id] = entry
|
|
}
|
|
|
|
d.genSnapshot()
|
|
if d.idfOracle != nil {
|
|
d.idfOracle.SetNext(d.current.Load())
|
|
d.idfOracle.LazyRemoveGrowings(action.GetTargetVersion(), redundantGrowings...)
|
|
}
|
|
d.updateServiceable("SyncTargetVersion")
|
|
|
|
log.Info("Update channel query view",
|
|
zap.String("channel", d.channelName),
|
|
zap.Int64s("partitions", partitions),
|
|
zap.Int64("oldVersion", oldValue),
|
|
zap.Int64("newVersion", action.GetTargetVersion()),
|
|
zap.Bool("serviceable", d.queryView.Serviceable()),
|
|
zap.Float64("loadedRatio", d.queryView.GetLoadedRatio()),
|
|
zap.Int("growingSegmentNum", len(action.GetGrowingInTarget())),
|
|
zap.Int("sealedSegmentNum", len(action.GetSealedInTarget())),
|
|
)
|
|
}
|
|
|
|
// GetQueryView returns the current query view.
|
|
func (d *distribution) GetQueryView() *channelQueryView {
|
|
d.mut.RLock()
|
|
defer d.mut.RUnlock()
|
|
|
|
return d.queryView
|
|
}
|
|
|
|
// RemoveDistributions remove segments distributions and returns the clear signal channel.
|
|
func (d *distribution) RemoveDistributions(sealedSegments []SegmentEntry, growingSegments []SegmentEntry) chan struct{} {
|
|
d.mut.Lock()
|
|
defer d.mut.Unlock()
|
|
|
|
for _, sealed := range sealedSegments {
|
|
entry, ok := d.sealedSegments[sealed.SegmentID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if entry.NodeID == sealed.NodeID || sealed.NodeID == wildcardNodeID {
|
|
delete(d.sealedSegments, sealed.SegmentID)
|
|
}
|
|
}
|
|
|
|
for _, growing := range growingSegments {
|
|
_, ok := d.growingSegments[growing.SegmentID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
delete(d.growingSegments, growing.SegmentID)
|
|
}
|
|
|
|
log.Info("remove segments from distribution",
|
|
zap.String("channelName", d.channelName),
|
|
zap.Int64s("growing", lo.Map(growingSegments, func(s SegmentEntry, _ int) int64 { return s.SegmentID })),
|
|
zap.Int64s("sealed", lo.Map(sealedSegments, func(s SegmentEntry, _ int) int64 { return s.SegmentID })),
|
|
)
|
|
|
|
d.updateServiceable("RemoveDistributions")
|
|
// wait previous read even not distribution changed
|
|
// in case of segment balance caused segment lost track
|
|
return d.genSnapshot()
|
|
}
|
|
|
|
// getSnapshot converts current distribution to snapshot format.
|
|
// in which, user could use found nodeID=>segmentID list.
|
|
// mutex RLock is required before calling this method.
|
|
func (d *distribution) genSnapshot() chan struct{} {
|
|
// stores last snapshot
|
|
// ok to be nil
|
|
last := d.current.Load()
|
|
|
|
nodeSegments := make(map[int64][]SegmentEntry)
|
|
for _, entry := range d.sealedSegments {
|
|
nodeSegments[entry.NodeID] = append(nodeSegments[entry.NodeID], entry)
|
|
}
|
|
|
|
// only store working partition entry in snapshot to reduce calculation
|
|
dist := make([]SnapshotItem, 0, len(nodeSegments))
|
|
for nodeID, items := range nodeSegments {
|
|
dist = append(dist, SnapshotItem{
|
|
NodeID: nodeID,
|
|
Segments: lo.Map(items, func(entry SegmentEntry, _ int) SegmentEntry {
|
|
if !d.queryView.partitions.Contain(entry.PartitionID) {
|
|
entry.TargetVersion = unreadableTargetVersion
|
|
}
|
|
return entry
|
|
}),
|
|
})
|
|
}
|
|
|
|
growing := make([]SegmentEntry, 0, len(d.growingSegments))
|
|
for _, entry := range d.growingSegments {
|
|
if !d.queryView.partitions.Contain(entry.PartitionID) {
|
|
entry.TargetVersion = unreadableTargetVersion
|
|
}
|
|
growing = append(growing, entry)
|
|
}
|
|
|
|
// update snapshot version
|
|
d.snapshotVersion++
|
|
newSnapShot := NewSnapshot(dist, growing, last, d.snapshotVersion, d.queryView.GetVersion())
|
|
newSnapShot.partitions = d.queryView.partitions
|
|
|
|
d.current.Store(newSnapShot)
|
|
// shall be a new one
|
|
d.snapshots.GetOrInsert(d.snapshotVersion, newSnapShot)
|
|
|
|
// first snapshot, return closed chan
|
|
if last == nil {
|
|
ch := make(chan struct{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
last.Expire(d.getCleanup(last.version))
|
|
|
|
return last.cleared
|
|
}
|
|
|
|
func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool {
|
|
return func(entry SegmentEntry, _ int) bool {
|
|
// segment L0 is not readable for now
|
|
return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion)
|
|
}
|
|
}
|
|
|
|
// getCleanup returns cleanup snapshots function.
|
|
func (d *distribution) getCleanup(version int64) snapshotCleanup {
|
|
return func() {
|
|
d.snapshots.GetAndRemove(version)
|
|
}
|
|
}
|