Close Node/Segment detector when close ShardCluster (#18476)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-08-01 13:50:33 +08:00 committed by GitHub
parent 3871758c0e
commit f0fe8dae0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 0 deletions

View File

@ -102,13 +102,20 @@ type shardSegmentInfo struct {
inUse int32 inUse int32
} }
// Closable interface for close.
type Closable interface {
Close()
}
// ShardNodeDetector provides method to detect node events // ShardNodeDetector provides method to detect node events
type ShardNodeDetector interface { type ShardNodeDetector interface {
Closable
watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent)
} }
// ShardSegmentDetector provides method to detect segment events // ShardSegmentDetector provides method to detect segment events
type ShardSegmentDetector interface { type ShardSegmentDetector interface {
Closable
watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent)
} }
@ -181,6 +188,13 @@ func (sc *ShardCluster) Close() {
log.Info("Close shard cluster") log.Info("Close shard cluster")
sc.closeOnce.Do(func() { sc.closeOnce.Do(func() {
sc.updateShardClusterState(unavailable) sc.updateShardClusterState(unavailable)
if sc.nodeDetector != nil {
sc.nodeDetector.Close()
}
if sc.segmentDetector != nil {
sc.segmentDetector.Close()
}
close(sc.closeCh) close(sc.closeCh)
}) })
} }

View File

@ -38,6 +38,8 @@ func (m *mockNodeDetector) watchNodes(collectionID int64, replicaID int64, vchan
return m.initNodes, m.evtCh return m.initNodes, m.evtCh
} }
func (m *mockNodeDetector) Close() {}
type mockSegmentDetector struct { type mockSegmentDetector struct {
initSegments []segmentEvent initSegments []segmentEvent
evtCh chan segmentEvent evtCh chan segmentEvent
@ -47,6 +49,8 @@ func (m *mockSegmentDetector) watchSegments(collectionID int64, replicaID int64,
return m.initSegments, m.evtCh return m.initSegments, m.evtCh
} }
func (m *mockSegmentDetector) Close() {}
type mockShardQueryNode struct { type mockShardQueryNode struct {
statisticResponse *internalpb.GetStatisticsResponse statisticResponse *internalpb.GetStatisticsResponse
statisticErr error statisticErr error