mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ade1f45c34
commit
49d9e5facb
@ -32,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/errorutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type shardClusterState int32
|
||||
@ -611,6 +612,26 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
|
||||
|
||||
// appendHandoff adds the change info into pending list and returns the token.
|
||||
func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onlineSegmentIDs []UniqueID) int64 {
|
||||
|
||||
// the suspects growing segment ids
|
||||
// first all online segment shall be tried, for flush-handoff only puts segment in onlineSegments
|
||||
// and we need to try all offlineSegments in case flush-compact-handoff case
|
||||
possibleGrowingToRemove := make([]UniqueID, 0, len(info.OfflineSegments)+len(onlineSegmentIDs))
|
||||
offlineSegments := typeutil.UniqueSet{} // map stores offline segment id for quick check
|
||||
for _, offline := range info.OfflineSegments {
|
||||
offlineSegments.Insert(offline.GetSegmentID())
|
||||
possibleGrowingToRemove = append(possibleGrowingToRemove, offline.GetSegmentID())
|
||||
}
|
||||
// add online segment ids to suspect list
|
||||
possibleGrowingToRemove = append(possibleGrowingToRemove, onlineSegmentIDs...)
|
||||
|
||||
// generate next version allocation
|
||||
sc.mut.RLock()
|
||||
allocations := sc.segments.Clone(func(segmentID int64) bool {
|
||||
return offlineSegments.Contain(segmentID)
|
||||
})
|
||||
sc.mut.RUnlock()
|
||||
|
||||
sc.mutVersion.Lock()
|
||||
defer sc.mutVersion.Unlock()
|
||||
|
||||
@ -618,14 +639,7 @@ func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onli
|
||||
versionID := sc.nextVersionID.Inc()
|
||||
// remove offline segments in next version
|
||||
// so incoming request will not have allocation of these segments
|
||||
version := NewShardClusterVersion(versionID, sc.segments.Clone(func(segmentID int64) bool {
|
||||
for _, offline := range info.OfflineSegments {
|
||||
if offline.GetSegmentID() == segmentID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}))
|
||||
version := NewShardClusterVersion(versionID, allocations)
|
||||
sc.versions.Store(versionID, version)
|
||||
|
||||
var lastVersionID int64
|
||||
@ -657,7 +671,7 @@ func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onli
|
||||
// error ignored here
|
||||
sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
|
||||
CollectionID: sc.collectionID,
|
||||
SegmentIDs: onlineSegmentIDs,
|
||||
SegmentIDs: possibleGrowingToRemove,
|
||||
Scope: querypb.DataScope_Streaming,
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user