Fix 'TestChannelManager_StateTransfer' ut when etcd is slow (#17082)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2022-05-18 17:19:57 +08:00 committed by GitHub
parent 99b4042f8c
commit cf08b5aa11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -88,6 +88,17 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}
}
makeSureEctdData := func(key string) {
for {
// make sure etcd has finished the operation
_, err := metakv.Load(key)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
}
t.Run("toWatch-WatchSuccess", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
@ -108,9 +119,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
@ -137,9 +146,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
@ -170,9 +177,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.stateTimer.notifyTimeoutWatcher(e)
chManager.stateTimer.stopIfExsit(e)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
@ -211,16 +216,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
for {
prefix := Params.DataCoordCfg.ChannelWatchSubPath
// make sure etcd has finished the operation
_, err := metakv.Load(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
cancel()
wg.Wait()
@ -267,9 +263,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
cancel()
wg.Wait()