From 702e3253d0ea4dae202f90aec3fe7e3982e875df Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 9 Aug 2022 18:18:36 +0800 Subject: [PATCH] Close Node/Segment detector when close ShardCluster (#18476) (#18568) Signed-off-by: Congqi Xia --- internal/querynode/shard_cluster.go | 14 ++++++++++++++ internal/querynode/shard_cluster_test.go | 4 ++++ 2 files changed, 18 insertions(+) diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 19f46dadf8..b76e3762da 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -102,13 +102,20 @@ type shardSegmentInfo struct { inUse int32 } +// Closable interface for close. +type Closable interface { + Close() +} + // ShardNodeDetector provides method to detect node events type ShardNodeDetector interface { + Closable watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) } // ShardSegmentDetector provides method to detect segment events type ShardSegmentDetector interface { + Closable watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) } @@ -181,6 +188,13 @@ func (sc *ShardCluster) Close() { log.Info("Close shard cluster") sc.closeOnce.Do(func() { sc.updateShardClusterState(unavailable) + if sc.nodeDetector != nil { + sc.nodeDetector.Close() + } + if sc.segmentDetector != nil { + sc.segmentDetector.Close() + } + close(sc.closeCh) }) } diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 8846d4950f..f85bd83a90 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -38,6 +38,8 @@ func (m *mockNodeDetector) watchNodes(collectionID int64, replicaID int64, vchan return m.initNodes, m.evtCh } +func (m *mockNodeDetector) Close() {} + type mockSegmentDetector struct { initSegments []segmentEvent evtCh chan segmentEvent @@ -47,6 +49,8 @@ func (m *mockSegmentDetector) watchSegments(collectionID int64, replicaID int64, return m.initSegments, m.evtCh } +func (m *mockSegmentDetector) Close() {} + type mockShardQueryNode struct { searchResult *internalpb.SearchResults searchErr error