From 6e4509d0fc451f15f0aa1b66a77c88f68b7b9229 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 7 Nov 2022 17:07:03 +0800 Subject: [PATCH] Fix version not deleted if expired by loadSegment (#20361) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/querynode/shard_cluster.go | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 7eb0e5c6a4..27907fa5b2 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -151,7 +151,6 @@ type ShardCluster struct { currentVersion *ShardClusterVersion // current serving segment state version nextVersionID *atomic.Int64 segmentCond *sync.Cond // segment state change condition - rcCond *sync.Cond // segment rc change condition closeOnce sync.Once closeCh chan struct{} @@ -180,8 +179,6 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, m := sync.Mutex{} sc.segmentCond = sync.NewCond(&m) - m2 := sync.Mutex{} - sc.rcCond = sync.NewCond(&m2) sc.init() @@ -610,17 +607,16 @@ func (sc *ShardCluster) segmentAllocations(partitionIDs []int64) (map[int64][]in // finishUsage decreases the inUse count of provided segments func (sc *ShardCluster) finishUsage(versionID int64) { - defer func() { - sc.rcCond.L.Lock() - sc.rcCond.Broadcast() - sc.rcCond.L.Unlock() - }() - v, ok := sc.versions.Load(versionID) - if ok { - version := v.(*ShardClusterVersion) - version.FinishUsage() + if !ok { + return } + + version := v.(*ShardClusterVersion) + version.FinishUsage() + + // cleanup version if expired + sc.cleanupVersion(version) } // LoadSegments loads segments with shardCluster. @@ -680,9 +676,11 @@ func (sc *ShardCluster) LoadSegments(ctx context.Context, req *querypb.LoadSegme allocations[info.SegmentID] = shardSegmentInfo{nodeID: req.DstNodeID, segmentID: info.SegmentID, partitionID: info.PartitionID, state: segmentStateLoaded} } + lastVersion := sc.currentVersion version := NewShardClusterVersion(sc.nextVersionID.Inc(), allocations, sc.currentVersion) sc.versions.Store(version.versionID, version) sc.currentVersion = version + sc.cleanupVersion(lastVersion) return nil } @@ -703,7 +701,7 @@ func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.Releas offlineSegments.Insert(req.GetSegmentIDs()...) } - var lastVersionID int64 + var lastVersion *ShardClusterVersion var err error func() { sc.mutVersion.Lock() @@ -729,7 +727,7 @@ func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.Releas if sc.currentVersion != nil { // wait for last version search done <-sc.currentVersion.Expire() - lastVersionID = sc.currentVersion.versionID + lastVersion = sc.currentVersion } } @@ -762,7 +760,7 @@ func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.Releas err = fmt.Errorf("follower %d failed to release segment, reason %s", req.NodeID, resp.GetReason()) } }() - sc.cleanupVersion(lastVersionID) + sc.cleanupVersion(lastVersion) sc.mut.Lock() // do not delete segment if data scope is streaming @@ -784,14 +782,18 @@ func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.Releas } // cleanupVersion clean up version from map -func (sc *ShardCluster) cleanupVersion(versionID int64) { - sc.mutVersion.RLock() - defer sc.mutVersion.RUnlock() - // prevent clean up current version - if sc.currentVersion != nil && sc.currentVersion.versionID == versionID { +func (sc *ShardCluster) cleanupVersion(version *ShardClusterVersion) { + // last version nil, just return + if version == nil { return } - sc.versions.Delete(versionID) + + // if version is still current one or still in use, return + if version.current.Load() || version.inUse.Load() > 0 { + return + } + + sc.versions.Delete(version.versionID) } // waitSegmentsOnline waits until all provided segments is loaded.