From cf08b5aa1160eb56c49f17eebbea16bee434853a Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 18 May 2022 17:19:57 +0800 Subject: [PATCH] Fix 'TestChannelManager_StateTransfer' ut when etcd is slow (#17082) Signed-off-by: SimFG --- internal/datacoord/channel_manager_test.go | 38 +++++++++------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 379aac5958..20f1bf6c68 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -88,6 +88,17 @@ func TestChannelManager_StateTransfer(t *testing.T) { } } + makeSureEctdData := func(key string) { + for { + // make sure etcd has finished the operation + _, err := metakv.Load(key) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + } + t.Run("toWatch-WatchSuccess", func(t *testing.T) { metakv.RemoveWithPrefix("") ctx, cancel := context.WithCancel(context.TODO()) @@ -108,9 +119,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - // TODO: cancel could arrive earlier than etcd action watch channel - // if etcd has poor response latency. - time.Sleep(time.Second) + makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() @@ -137,9 +146,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - // TODO: cancel could arrive earlier than etcd action watch channel - // if etcd has poor response latency. - time.Sleep(time.Second) + makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) @@ -170,9 +177,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.stateTimer.notifyTimeoutWatcher(e) chManager.stateTimer.stopIfExsit(e) - // TODO: cancel could arrive earlier than etcd action watch channel - // if etcd has poor response latency. - time.Sleep(time.Second) + makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) @@ -211,16 +216,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - for { - prefix := Params.DataCoordCfg.ChannelWatchSubPath - // make sure etcd has finished the operation - _, err := metakv.Load(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) - if err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - + makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) cancel() wg.Wait() @@ -267,9 +263,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - // TODO: cancel could arrive earlier than etcd action watch channel - // if etcd has poor response latency. - time.Sleep(time.Second) + makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) cancel() wg.Wait()