mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Fix shard detector rewatch always returns closed channel (#23734)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ffa2dc4bd3
commit
5aad58d318
@ -178,8 +178,7 @@ func (nd *etcdShardNodeDetector) handleEvt(evt clientv3.WatchResponse, collectio
|
||||
}
|
||||
|
||||
func (nd *etcdShardNodeDetector) rewatch(collectionID, replicaID, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ctx := context.Background()
|
||||
revision = rev
|
||||
err := retry.Do(ctx, func() error {
|
||||
ch = nd.client.Watch(ctx, nd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
||||
|
||||
@ -404,3 +404,25 @@ func TestNodeDetectorHandleWithError(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestNodeDetectorRewatch(t *testing.T) {
|
||||
client := v3client.New(embedetcdServer.Server)
|
||||
defer client.Close()
|
||||
suffix := funcutil.RandomString(6)
|
||||
rootPath := fmt.Sprintf("qn_shard_node_detector_watch_%s", suffix)
|
||||
|
||||
nd := NewEtcdShardNodeDetector(client, rootPath, func() (map[int64]string, error) {
|
||||
r := make(map[int64]string)
|
||||
return r, nil
|
||||
})
|
||||
|
||||
ch, ok, _ := nd.rewatch(1000, 500, 0)
|
||||
assert.True(t, ok)
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
assert.FailNow(t, "rewatch return closed channel")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,7 +124,7 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
log.Warn("SegmentDetector event channel closed, retry...")
|
||||
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, revision)
|
||||
watchCh, ok, revision := sd.rewatch(collectionID, replicaID, vchannel, revision)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
@ -134,7 +134,7 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
|
||||
}
|
||||
if err := evt.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision)
|
||||
watchCh, ok, revision := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
@ -151,9 +151,9 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
|
||||
}
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, revision int64) (ch clientv3.WatchChan, ok bool) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) {
|
||||
ctx := context.Background()
|
||||
revision = rev
|
||||
err := retry.Do(ctx, func() error {
|
||||
ch = sd.client.Watch(ctx, sd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
||||
select {
|
||||
@ -182,13 +182,13 @@ func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64,
|
||||
if err != nil {
|
||||
select {
|
||||
case <-sd.closeCh:
|
||||
return nil, false
|
||||
return nil, false, revision
|
||||
default:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return ch, true
|
||||
return ch, true, revision
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) handleEvt(evt clientv3.WatchResponse, collectionID int64, replicaID int64, vchannel string) {
|
||||
|
||||
@ -301,3 +301,22 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSegmentDetectorRewatch(t *testing.T) {
|
||||
client := v3client.New(embedetcdServer.Server)
|
||||
defer client.Close()
|
||||
suffix := funcutil.RandomString(6)
|
||||
rootPath := fmt.Sprintf("qn_shard_segment_detector_watch_%s", suffix)
|
||||
|
||||
sd := NewEtcdShardSegmentDetector(client, rootPath)
|
||||
|
||||
ch, ok, _ := sd.rewatch(1000, 500, "vchannel1", 0)
|
||||
assert.True(t, ok)
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
assert.FailNow(t, "rewatch return closed channel")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user