From 5471e35cccfde4e377ec60c73b7c4f1de77971d9 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 25 Jun 2022 18:48:15 +0800 Subject: [PATCH] Wait last version search to release growing safely (#17713) Signed-off-by: Congqi Xia --- internal/querynode/shard_cluster.go | 113 +++++++++++++---------- internal/querynode/shard_cluster_test.go | 9 ++ 2 files changed, 72 insertions(+), 50 deletions(-) diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 399a898f81..316d9c0d12 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -128,12 +128,14 @@ type ShardCluster struct { segmentDetector ShardSegmentDetector nodeBuilder ShardNodeBuilder - mut sync.RWMutex - leader *shardNode // shard leader node instance - nodes map[int64]*shardNode // online nodes - segments SegmentsStatus // shard segments - versions map[int64]*ShardClusterVersion // version id to version map - currentVersion *ShardClusterVersion // current serving segment state version + mut sync.RWMutex + leader *shardNode // shard leader node instance + nodes map[int64]*shardNode // online nodes + segments SegmentsStatus // shard segments + + mutVersion sync.RWMutex + versions sync.Map // version id to version + currentVersion *ShardClusterVersion // current serving segment state version nextVersionID *atomic.Int64 segmentCond *sync.Cond // segment state change condition rcCond *sync.Cond // segment rc change condition @@ -158,7 +160,6 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, nodes: make(map[int64]*shardNode), segments: make(map[int64]shardSegmentInfo), - versions: make(map[int64]*ShardClusterVersion), nextVersionID: atomic.NewInt64(0), closeCh: make(chan struct{}), @@ -189,8 +190,8 @@ func (sc *ShardCluster) serviceable() bool { return false } - sc.mut.RLock() - defer sc.mut.RUnlock() + sc.mutVersion.RLock() + defer sc.mutVersion.RUnlock() // check there is a working version(SyncSegments called) return sc.currentVersion != nil } @@ -276,15 +277,8 @@ func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) { // SyncSegments synchronize segment distribution in batch func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) { log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state))) - // notify handoff wait online if any - defer func() { - sc.segmentCond.L.Lock() - sc.segmentCond.Broadcast() - sc.segmentCond.L.Unlock() - }() - sc.mut.Lock() - defer sc.mut.Unlock() + sc.mut.Lock() for _, line := range distribution { for _, segmentID := range line.GetSegmentIds() { old, ok := sc.segments[segmentID] @@ -306,9 +300,17 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo }) } } + sc.mut.Unlock() + // notify handoff wait online if any + sc.segmentCond.L.Lock() + sc.segmentCond.Broadcast() + sc.segmentCond.L.Unlock() + + sc.mutVersion.Lock() + defer sc.mutVersion.Unlock() version := NewShardClusterVersion(sc.nextVersionID.Inc(), sc.segments.Clone(filterNothing)) - sc.versions[version.versionID] = version + sc.versions.Store(version.versionID, version) sc.currentVersion = version } @@ -498,8 +500,8 @@ func (sc *ShardCluster) segmentAllocations(partitionIDs []int64) (map[int64][]in log.Warn("request segment allocations when cluster is not serviceable", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName)) return map[int64][]int64{}, 0 } - sc.mut.RLock() - defer sc.mut.RUnlock() + sc.mutVersion.RLock() + defer sc.mutVersion.RUnlock() // return allocation from current version and version id return sc.currentVersion.GetAllocation(partitionIDs), sc.currentVersion.versionID } @@ -511,11 +513,10 @@ func (sc *ShardCluster) finishUsage(versionID int64) { sc.rcCond.Broadcast() sc.rcCond.L.Unlock() }() - sc.mut.Lock() - defer sc.mut.Unlock() - version, ok := sc.versions[versionID] + v, ok := sc.versions.Load(versionID) if ok { + version := v.(*ShardClusterVersion) version.FinishUsage() } } @@ -540,10 +541,7 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { // now online segment can provide service, generate a new version // add segmentChangeInfo to pending list - signal, versionID := sc.appendHandoff(info) - - // wait last version not in use - <-signal + versionID := sc.applySegmentChange(info, onlineSegmentIDs) removes := make(map[int64][]int64) // nodeID => []segmentIDs // remove offline segments record @@ -558,16 +556,6 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { } var errs errorutil.ErrorList - // remove growing segments if any - // handles the case for Growing to Sealed Handoff(which does not has offline segment info) - if sc.leader != nil { - // error ignored here - sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{ - CollectionID: sc.collectionID, - SegmentIDs: onlineSegmentIDs, - Scope: querypb.DataScope_Streaming, - }) - } // notify querynode(s) to release segments for nodeID, segmentIDs := range removes { @@ -601,9 +589,9 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { } // appendHandoff adds the change info into pending list and returns the token. -func (sc *ShardCluster) appendHandoff(info *querypb.SegmentChangeInfo) (chan struct{}, int64) { - sc.mut.Lock() - defer sc.mut.Unlock() +func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onlineSegmentIDs []UniqueID) int64 { + sc.mutVersion.Lock() + defer sc.mutVersion.Unlock() // generate a new version versionID := sc.nextVersionID.Inc() @@ -617,33 +605,58 @@ func (sc *ShardCluster) appendHandoff(info *querypb.SegmentChangeInfo) (chan str } return false })) - sc.versions[versionID] = version + sc.versions.Store(versionID, version) var lastVersionID int64 - // signal for nil current version case, no need to wait - signal := make(chan struct{}) - close(signal) + /* + ---------------------------------------------------------------------------- + T0 |T1(S2 online)| T2(change version)|T3(remove G2)| + ---------------------------------------------------------------------------- + G2, G3 |G2, G3 | G2, G3 | G3 + ---------------------------------------------------------------------------- + S1 |S1, S2 | S1, S2 | S1,S2 + ---------------------------------------------------------------------------- + v0=[S1] |v0=[S1] | v1=[S1,S2] | v1=[S1,S2] + + There is no method to ensure search after T2 does not search G2 so that it + could be removed safely + Currently, the only safe method is to block incoming allocation, so there is no + search will be dispatch to G2. + After shard cluster is able to maintain growing semgents, this version change could + reduce the lock range + */ // currentVersion shall be not nil if sc.currentVersion != nil { - signal = sc.currentVersion.Expire() + // wait for last version search done + <-sc.currentVersion.Expire() lastVersionID = sc.currentVersion.versionID + // remove growing segments if any + // handles the case for Growing to Sealed Handoff(which does not has offline segment info) + if sc.leader != nil { + // error ignored here + sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{ + CollectionID: sc.collectionID, + SegmentIDs: onlineSegmentIDs, + Scope: querypb.DataScope_Streaming, + }) + } } + // set current version to new one sc.currentVersion = version - sc.versions[versionID] = version - return signal, lastVersionID + return lastVersionID } // cleanupVersion clean up version from map func (sc *ShardCluster) cleanupVersion(versionID int64) { - sc.mut.Lock() - defer sc.mut.Unlock() + sc.mutVersion.RLock() + defer sc.mutVersion.RUnlock() // prevent clean up current version if sc.currentVersion != nil && sc.currentVersion.versionID == versionID { return } - delete(sc.versions, versionID) + sc.versions.Delete(versionID) } // waitSegmentsOnline waits until all provided segments is loaded. diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 6a9bcfd9ff..fe32139112 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -1652,6 +1652,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2, @@ -1705,6 +1706,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2, @@ -1755,6 +1757,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2, @@ -1804,6 +1807,8 @@ func TestShardCluster_HandoffSegments(t *testing.T) { close(sig) }() + sc.finishUsage(versionID) + evtCh <- segmentEvent{ eventType: segmentAdd, segmentID: 3, @@ -1847,6 +1852,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2, @@ -1902,6 +1908,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { nodeIDs: []int64{2}, state: segmentStateLoaded, } + sc.finishUsage(versionID) // wait for handoff appended into list assert.Eventually(t, func() bool { @@ -1938,6 +1945,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2, @@ -1986,6 +1994,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) { { nodeID: 1, nodeAddr: "addr_1", + isLeader: true, }, { nodeID: 2,