mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Remove growing according to offline as well (#17968)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ee1ca4d21f
commit
f4177f4434
@ -32,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/errorutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type shardClusterState int32
|
||||
@ -611,15 +612,23 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
|
||||
|
||||
// appendHandoff adds the change info into pending list and returns the token.
|
||||
func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onlineSegmentIDs []UniqueID) int64 {
|
||||
|
||||
// the suspects growing segment ids
|
||||
// first all online segment shall be tried, for flush-handoff only puts segment in onlineSegments
|
||||
// and we need to try all offlineSegments in case flush-compact-handoff case
|
||||
possibleGrowingToRemove := make([]UniqueID, 0, len(info.OfflineSegments)+len(onlineSegmentIDs))
|
||||
offlineSegments := typeutil.UniqueSet{} // map stores offline segment id for quick check
|
||||
for _, offline := range info.OfflineSegments {
|
||||
offlineSegments.Insert(offline.GetSegmentID())
|
||||
possibleGrowingToRemove = append(possibleGrowingToRemove, offline.GetSegmentID())
|
||||
}
|
||||
// add online segment ids to suspect list
|
||||
possibleGrowingToRemove = append(possibleGrowingToRemove, onlineSegmentIDs...)
|
||||
|
||||
// generate next version allocation
|
||||
sc.mut.RLock()
|
||||
allocations := sc.segments.Clone(func(segmentID int64) bool {
|
||||
for _, offline := range info.OfflineSegments {
|
||||
if offline.GetSegmentID() == segmentID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return offlineSegments.Contain(segmentID)
|
||||
})
|
||||
sc.mut.RUnlock()
|
||||
|
||||
@ -662,7 +671,7 @@ func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onli
|
||||
// error ignored here
|
||||
sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
|
||||
CollectionID: sc.collectionID,
|
||||
SegmentIDs: onlineSegmentIDs,
|
||||
SegmentIDs: possibleGrowingToRemove,
|
||||
Scope: querypb.DataScope_Streaming,
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user