From c179986e6cfed91add9fe0031033f6702e24b235 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 29 Jun 2022 16:20:19 +0800 Subject: [PATCH] Fix shard cluster segments data race (#17903) Signed-off-by: Congqi Xia --- internal/querynode/shard_cluster.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 250f8426c4..c390ec2c80 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -611,6 +611,18 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { // appendHandoff adds the change info into pending list and returns the token. func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onlineSegmentIDs []UniqueID) int64 { + // generate next version allocation + sc.mut.RLock() + allocations := sc.segments.Clone(func(segmentID int64) bool { + for _, offline := range info.OfflineSegments { + if offline.GetSegmentID() == segmentID { + return true + } + } + return false + }) + sc.mut.RUnlock() + sc.mutVersion.Lock() defer sc.mutVersion.Unlock() @@ -618,14 +630,7 @@ func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onli versionID := sc.nextVersionID.Inc() // remove offline segments in next version // so incoming request will not have allocation of these segments - version := NewShardClusterVersion(versionID, sc.segments.Clone(func(segmentID int64) bool { - for _, offline := range info.OfflineSegments { - if offline.GetSegmentID() == segmentID { - return true - } - } - return false - })) + version := NewShardClusterVersion(versionID, allocations) sc.versions.Store(versionID, version) var lastVersionID int64