From 5aad58d318ccb557d806878e3ae9b2682fa7bd4e Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 26 Apr 2023 19:26:39 +0800 Subject: [PATCH] Fix shard detector rewatch always returns closed channel (#23734) Signed-off-by: Congqi Xia --- internal/querynode/shard_node_detector.go | 3 +-- .../querynode/shard_node_detector_test.go | 22 +++++++++++++++++++ internal/querynode/shard_segment_detector.go | 14 ++++++------ .../querynode/shard_segment_detector_test.go | 19 ++++++++++++++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/internal/querynode/shard_node_detector.go b/internal/querynode/shard_node_detector.go index 800bfbd333..c5c4ba4450 100644 --- a/internal/querynode/shard_node_detector.go +++ b/internal/querynode/shard_node_detector.go @@ -178,8 +178,7 @@ func (nd *etcdShardNodeDetector) handleEvt(evt clientv3.WatchResponse, collectio } func (nd *etcdShardNodeDetector) rewatch(collectionID, replicaID, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() revision = rev err := retry.Do(ctx, func() error { ch = nd.client.Watch(ctx, nd.path, clientv3.WithPrefix(), clientv3.WithRev(revision)) diff --git a/internal/querynode/shard_node_detector_test.go b/internal/querynode/shard_node_detector_test.go index 547990a77b..153a7488cf 100644 --- a/internal/querynode/shard_node_detector_test.go +++ b/internal/querynode/shard_node_detector_test.go @@ -404,3 +404,25 @@ func TestNodeDetectorHandleWithError(t *testing.T) { }) }) } + +func TestNodeDetectorRewatch(t *testing.T) { + client := v3client.New(embedetcdServer.Server) + defer client.Close() + suffix := funcutil.RandomString(6) + rootPath := fmt.Sprintf("qn_shard_node_detector_watch_%s", suffix) + + nd := NewEtcdShardNodeDetector(client, rootPath, func() (map[int64]string, error) { + r := make(map[int64]string) + return r, nil + }) + + ch, ok, _ := nd.rewatch(1000, 500, 0) + assert.True(t, ok) + select { + case _, ok := <-ch: + if !ok { + assert.FailNow(t, "rewatch return closed channel") + } + case <-time.After(time.Second): + } +} diff --git a/internal/querynode/shard_segment_detector.go b/internal/querynode/shard_segment_detector.go index b2365c03fc..57af3f7816 100644 --- a/internal/querynode/shard_segment_detector.go +++ b/internal/querynode/shard_segment_detector.go @@ -124,7 +124,7 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in case evt, ok := <-ch: if !ok { log.Warn("SegmentDetector event channel closed, retry...") - watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, revision) + watchCh, ok, revision := sd.rewatch(collectionID, replicaID, vchannel, revision) if !ok { return } @@ -134,7 +134,7 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in } if err := evt.Err(); err != nil { if err == v3rpc.ErrCompacted { - watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision) + watchCh, ok, revision := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision) if !ok { return } @@ -151,9 +151,9 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in } } -func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, revision int64) (ch clientv3.WatchChan, ok bool) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) { + ctx := context.Background() + revision = rev err := retry.Do(ctx, func() error { ch = sd.client.Watch(ctx, sd.path, clientv3.WithPrefix(), clientv3.WithRev(revision)) select { @@ -182,13 +182,13 @@ func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, if err != nil { select { case <-sd.closeCh: - return nil, false + return nil, false, revision default: panic(err) } } - return ch, true + return ch, true, revision } func (sd *etcdShardSegmentDetector) handleEvt(evt clientv3.WatchResponse, collectionID int64, replicaID int64, vchannel string) { diff --git a/internal/querynode/shard_segment_detector_test.go b/internal/querynode/shard_segment_detector_test.go index cd16bebf1d..7362bab900 100644 --- a/internal/querynode/shard_segment_detector_test.go +++ b/internal/querynode/shard_segment_detector_test.go @@ -301,3 +301,22 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) { }) } } + +func TestSegmentDetectorRewatch(t *testing.T) { + client := v3client.New(embedetcdServer.Server) + defer client.Close() + suffix := funcutil.RandomString(6) + rootPath := fmt.Sprintf("qn_shard_segment_detector_watch_%s", suffix) + + sd := NewEtcdShardSegmentDetector(client, rootPath) + + ch, ok, _ := sd.rewatch(1000, 500, "vchannel1", 0) + assert.True(t, ok) + select { + case _, ok := <-ch: + if !ok { + assert.FailNow(t, "rewatch return closed channel") + } + case <-time.After(time.Second): + } +}