diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 2c3945a6c0..f099e418bb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -337,9 +337,10 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { for _, ch := range op.Channels { vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) info := &datapb.ChannelWatchInfo{ - Vchan: vcInfo, - StartTs: time.Now().Unix(), - State: datapb.ChannelWatchState_Uncomplete, + Vchan: vcInfo, + StartTs: time.Now().Unix(), + State: datapb.ChannelWatchState_Uncomplete, + TimeoutTs: time.Now().Add(maxWatchDuration).UnixNano(), } op.ChannelWatchInfos = append(op.ChannelWatchInfos, info) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 64d6deb367..6d93f2622a 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -455,7 +455,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { } // if a channel is not watched after maxWatchDuration, // then we reallocate it to another node - if watchInfo.State == datapb.ChannelWatchState_Complete { + if watchInfo.State == datapb.ChannelWatchState_Complete || watchInfo.State == datapb.ChannelWatchState_WatchSuccess { continue } startTime := time.Unix(watchInfo.StartTs, 0) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 89d5b39d06..53d92d1521 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -293,11 +293,13 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here e = &event{ eventType: putEventType, + version: evt.Kv.Version, } case clientv3.EventTypeDelete: e = &event{ eventType: deleteEventType, + version: evt.Kv.Version, } } node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value) @@ -314,6 +316,11 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { return } + if isEndWatchState(watchInfo.State) { + log.Warn("DataNode received a PUT event with a end State", zap.String("state", watchInfo.State.String())) + return + } + e.info = watchInfo e.vChanName = watchInfo.GetVchan().GetChannelName() @@ -322,12 +329,9 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { e.vChanName = parseDeleteEventKey(key) } - actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, &channelEventManager{ - eventChan: make(chan event, 10), - closeChan: make(chan struct{}), - handlePutEvent: node.handlePutEvent, - handleDeleteEvent: node.handleDeleteEvent, - }) + actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, newChannelEventManager( + node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval, + )) if !loaded { actualManager.(*channelEventManager).Run() @@ -350,10 +354,6 @@ func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err) } - if watchInfo.State == datapb.ChannelWatchState_Complete { - return nil, fmt.Errorf("invalid event: event state is already ChannelWatchState_Compele") - } - if watchInfo.Vchan == nil { return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") } @@ -367,28 +367,51 @@ func parseDeleteEventKey(key string) string { return vChanName } -func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error { +func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) { vChanName := watchInfo.GetVchan().GetChannelName() - if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil { - return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) - } - log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + switch watchInfo.State { + case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: + if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil { + return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) + } + log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + defer func() { + if err != nil { + node.releaseFlowgraph(vChanName) + } + }() + watchInfo.State = datapb.ChannelWatchState_WatchSuccess + + case datapb.ChannelWatchState_ToRelease: + success := true + func() { + defer func() { + if x := recover(); x != nil { + log.Error("release flowgraph panic", zap.Any("recovered", x)) + success = false + } + }() + node.releaseFlowgraph(vChanName) + }() + if !success { + watchInfo.State = datapb.ChannelWatchState_ReleaseFailure + } else { + watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess + } + } - watchInfo.State = datapb.ChannelWatchState_Complete v, err := proto.Marshal(watchInfo) if err != nil { - return fmt.Errorf("fail to marshal watchInfo with complete state, vChanName: %s, err: %v", vChanName, err) + return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err) } k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName) - log.Info("handle put event: try to save completed state", zap.String("key", k)) + log.Info("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String())) - err = node.watchKv.Save(k, string(v)) - // TODO DataNode unable to save into etcd, may need to panic + err = node.watchKv.CompareVersionAndSwap(k, version, string(v)) if err != nil { - node.releaseFlowgraph(vChanName) - return fmt.Errorf("fail to update completed state to etcd, vChanName: %s, err: %v", vChanName, err) + return fmt.Errorf("fail to update watch state to etcd, vChanName: %s, state: %s, err: %w", vChanName, watchInfo.State.String(), err) } return nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index b0548a8e5c..bf4c95af47 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -396,8 +396,9 @@ func TestWatchChannel(t *testing.T) { UnflushedSegments: []*datapb.SegmentInfo{}, } info := &datapb.ChannelWatchInfo{ - State: datapb.ChannelWatchState_Uncomplete, - Vchan: vchan, + State: datapb.ChannelWatchState_ToWatch, + Vchan: vchan, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), } val, err := proto.Marshal(info) assert.Nil(t, err) @@ -418,6 +419,66 @@ func TestWatchChannel(t *testing.T) { assert.False(t, exist) }) + t.Run("Test release channel", func(t *testing.T) { + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" + path := fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, oldInvalidCh) + err = kv.Save(path, string([]byte{23})) + assert.NoError(t, err) + + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + path = fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, ch) + c := make(chan struct{}) + go func() { + ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID)) + c <- struct{}{} + cnt := 0 + for { + evt := <-ec + for _, event := range evt.Events { + if strings.Contains(string(event.Kv.Key), ch) { + cnt++ + } + } + if cnt >= 2 { + break + } + } + c <- struct{}{} + }() + // wait for check goroutine start Watch + <-c + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: ch, + UnflushedSegments: []*datapb.SegmentInfo{}, + } + info := &datapb.ChannelWatchInfo{ + State: datapb.ChannelWatchState_ToRelease, + Vchan: vchan, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + } + val, err := proto.Marshal(info) + assert.Nil(t, err) + err = kv.Save(path, string(val)) + assert.Nil(t, err) + + // wait for check goroutine received 2 events + <-c + exist := node.flowgraphManager.exist(ch) + assert.False(t, exist) + + err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID)) + assert.Nil(t, err) + //TODO there is not way to sync Release done, use sleep for now + time.Sleep(100 * time.Millisecond) + + exist = node.flowgraphManager.exist(ch) + assert.False(t, exist) + + }) + t.Run("handle watch info failed", func(t *testing.T) { e := &event{ eventType: putEventType, @@ -439,20 +500,81 @@ func TestWatchChannel(t *testing.T) { exist = node.flowgraphManager.exist("test2") assert.False(t, exist) + chPut := make(chan struct{}, 1) + chDel := make(chan struct{}, 1) + + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + m := newChannelEventManager( + func(info *datapb.ChannelWatchInfo, version int64) error { + r := node.handlePutEvent(info, version) + chPut <- struct{}{} + return r + }, + func(vChan string) { + node.handleDeleteEvent(vChan) + chDel <- struct{}{} + }, time.Millisecond*100, + ) + node.eventManagerMap.Store(ch, m) + m.Run() + info = datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{}, - State: datapb.ChannelWatchState_Uncomplete, + Vchan: &datapb.VchannelInfo{ChannelName: ch}, + State: datapb.ChannelWatchState_Uncomplete, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), } bs, err = proto.Marshal(&info) assert.NoError(t, err) + msFactory := node.msFactory + defer func() { node.msFactory = msFactory }() + node.msFactory = &FailMessageStreamFactory{ node.msFactory, } - node.handleWatchInfo(e, "test3", bs) - exist = node.flowgraphManager.exist("test3") + node.handleWatchInfo(e, ch, bs) + <-chPut + exist = node.flowgraphManager.exist(ch) assert.False(t, exist) }) + + t.Run("handle watchinfo out of date", func(t *testing.T) { + chPut := make(chan struct{}, 1) + chDel := make(chan struct{}, 1) + // inject eventManager + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + m := newChannelEventManager( + func(info *datapb.ChannelWatchInfo, version int64) error { + r := node.handlePutEvent(info, version) + chPut <- struct{}{} + return r + }, + func(vChan string) { + node.handleDeleteEvent(vChan) + chDel <- struct{}{} + }, time.Millisecond*100, + ) + node.eventManagerMap.Store(ch, m) + m.Run() + e := &event{ + eventType: putEventType, + version: 10000, + } + + info := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ChannelName: ch}, + State: datapb.ChannelWatchState_Uncomplete, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + } + bs, err := proto.Marshal(&info) + assert.NoError(t, err) + + node.handleWatchInfo(e, ch, bs) + <-chPut + exist := node.flowgraphManager.exist("test3") + assert.False(t, exist) + }) + } func TestDataNode_GetComponentStates(t *testing.T) { diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index debfea8652..8ee4b7f727 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -17,6 +17,7 @@ package datanode import ( + "sync" "time" "github.com/milvus-io/milvus/internal/log" @@ -30,14 +31,17 @@ const retryWatchInterval = 20 * time.Second type event struct { eventType int vChanName string + version int64 info *datapb.ChannelWatchInfo } type channelEventManager struct { + sync.Once eventChan chan event closeChan chan struct{} - handlePutEvent func(watchInfo *datapb.ChannelWatchInfo) error // node.handlePutEvent - handleDeleteEvent func(vChanName string) // node.handleDeleteEvent + handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent + handleDeleteEvent func(vChanName string) // node.handleDeleteEvent + retryInterval time.Duration } const ( @@ -45,6 +49,17 @@ const ( deleteEventType = 2 ) +func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error, + handleDel func(string), retryInterval time.Duration) *channelEventManager { + return &channelEventManager{ + eventChan: make(chan event, 10), + closeChan: make(chan struct{}), + handlePutEvent: handlePut, + handleDeleteEvent: handleDel, + retryInterval: retryInterval, + } +} + func (e *channelEventManager) Run() { go func() { for { @@ -52,36 +67,7 @@ func (e *channelEventManager) Run() { case event := <-e.eventChan: switch event.eventType { case putEventType: - // Trigger retry for-loop when fail to handle put event for the first time - if err := e.handlePutEvent(event.info); err != nil { - for { - log.Warn("handle put event fail, starting retry", - zap.String("vChanName", event.vChanName), - zap.String("retry interval", retryWatchInterval.String()), - zap.Error(err)) - - <-time.NewTimer(retryWatchInterval).C - - select { - case e, ok := <-e.eventChan: - // When getting a delete event at next retry, exit retry loop - // When getting a put event, just continue the retry - if ok && e.eventType == deleteEventType { - log.Warn("delete event triggerred, terminating retry.", - zap.String("vChanName", event.vChanName)) - return - } - default: - } - - err = e.handlePutEvent(event.info) - if err == nil { - log.Info("retry to handle put event successfully", - zap.String("vChanName", event.vChanName)) - return - } - } - } + e.retryHandlePutEvent(event) case deleteEventType: e.handleDeleteEvent(event.vChanName) } @@ -92,10 +78,79 @@ func (e *channelEventManager) Run() { }() } +func (e *channelEventManager) retryHandlePutEvent(event event) { + countdown := time.Until(time.Unix(0, event.info.TimeoutTs)) + if countdown < 0 { + log.Warn("event already timed out", zap.String("vChanName", event.vChanName)) + return + } + // Trigger retry for-loop when fail to handle put event for the first time + if err := e.handlePutEvent(event.info, event.version); err != nil { + timer := time.NewTimer(countdown) + defer timer.Stop() + ticker := time.NewTicker(e.retryInterval) + defer ticker.Stop() + for { + log.Warn("handle put event fail, starting retry", + zap.String("vChanName", event.vChanName), + zap.String("retry interval", e.retryInterval.String()), + zap.Error(err)) + + // reset the ticker + ticker.Reset(e.retryInterval) + + select { + case <-ticker.C: + // ticker notify, do another retry + case <-timer.C: + // timeout + log.Warn("event process timed out", zap.String("vChanName", event.vChanName)) + return + case evt, ok := <-e.eventChan: + if !ok { + log.Warn("event channel closed", zap.String("vChanName", event.vChanName)) + return + } + // When got another put event, overwrite current event + if evt.eventType == putEventType { + // handles only Uncomplete, ToWatch and ToRelease + if isEndWatchState(evt.info.State) { + return + } + event = evt + } + // When getting a delete event at next retry, exit retry loop + // When getting a put event, just continue the retry + if evt.eventType == deleteEventType { + log.Warn("delete event triggerred, terminating retry.", + zap.String("vChanName", event.vChanName)) + e.handleDeleteEvent(evt.vChanName) + return + } + } + + err = e.handlePutEvent(event.info, event.version) + if err == nil { + log.Info("retry to handle put event successfully", + zap.String("vChanName", event.vChanName)) + return + } + } + } +} + func (e *channelEventManager) handleEvent(event event) { e.eventChan <- event } func (e *channelEventManager) Close() { - close(e.closeChan) + e.Do(func() { + close(e.closeChan) + }) +} + +func isEndWatchState(state datapb.ChannelWatchState) bool { + return state != datapb.ChannelWatchState_ToWatch && // start watch + state != datapb.ChannelWatchState_ToRelease && // start release + state != datapb.ChannelWatchState_Uncomplete // legacy state, equal to ToWatch } diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go new file mode 100644 index 0000000000..e8df9186b8 --- /dev/null +++ b/internal/datanode/event_manager_test.go @@ -0,0 +1,280 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "errors" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestChannelEventManager(t *testing.T) { + t.Run("normal case", func(t *testing.T) { + ch := make(chan struct{}, 1) + ran := false + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + ran = true + ch <- struct{}{} + return nil + }, func(name string) {}, time.Millisecond*10) + + em.Run() + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + <-ch + assert.True(t, ran) + }) + + t.Run("event already timeout", func(t *testing.T) { + ch := make(chan struct{}, 1) + ran := false + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + ran = true + ch <- struct{}{} + return nil + }, func(name string) {}, time.Millisecond*10) + + em.Run() + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: time.Now().Add(-time.Minute).UnixNano(), + }, + }) + select { + case <-ch: + t.FailNow() + case <-time.NewTimer(time.Millisecond * 100).C: + } + assert.False(t, ran) + }) + + t.Run("retry success", func(t *testing.T) { + ch := make(chan struct{}, 1) + ran := false + counter := atomic.Int32{} + counter.Store(0) + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + current := counter.Add(1) + if current == 2 { + ran = true + ch <- struct{}{} + return nil + } + + return errors.New("mocked error") + }, func(name string) {}, time.Millisecond*10) + + em.Run() + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + <-ch + assert.True(t, ran) + }) + + t.Run("retry until timeout", func(t *testing.T) { + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + return errors.New("mocked error") + }, func(name string) {}, time.Millisecond*100) + + ch := make(chan struct{}, 1) + + go func() { + ddl := time.Now().Add(time.Millisecond * 50) + evt := event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: ddl.UnixNano(), + }, + } + em.retryHandlePutEvent(evt) + ch <- struct{}{} + }() + + select { + case <-ch: + case <-time.NewTimer(time.Second).C: + t.FailNow() + } + }) + + t.Run("close behavior", func(t *testing.T) { + ch := make(chan struct{}, 1) + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + return errors.New("mocked error") + }, func(name string) {}, time.Millisecond*10) + + go func() { + ddl := time.Now().Add(time.Minute) + evt := event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: ddl.UnixNano(), + }, + } + em.retryHandlePutEvent(evt) + ch <- struct{}{} + }() + + close(em.eventChan) + select { + case <-ch: + case <-time.NewTimer(time.Second).C: + t.FailNow() + } + + assert.NotPanics(t, func() { + em.Close() + em.Close() + }) + }) + + t.Run("cancel by delete event", func(t *testing.T) { + ch := make(chan struct{}, 1) + ran := false + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + return errors.New("mocked error") + }, func(name string) { + ran = true + ch <- struct{}{} + }, time.Millisecond*10) + em.Run() + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + em.handleEvent(event{ + eventType: deleteEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + <-ch + assert.True(t, ran) + }) + + t.Run("overwrite put event", func(t *testing.T) { + ch := make(chan struct{}, 1) + ran := false + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + if version > 0 { + ran = true + ch <- struct{}{} + return nil + } + return errors.New("mocked error") + }, func(name string) { + t.FailNow() + }, time.Millisecond*10) + em.Run() + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + State: datapb.ChannelWatchState_ToWatch, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + em.handleEvent(event{ + eventType: putEventType, + vChanName: "", + version: 1, + info: &datapb.ChannelWatchInfo{ + State: datapb.ChannelWatchState_ToWatch, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + }, + }) + <-ch + assert.True(t, ran) + }) + + t.Run("canceled by EndStates", func(t *testing.T) { + endStates := []datapb.ChannelWatchState{ + datapb.ChannelWatchState_Complete, + datapb.ChannelWatchState_WatchSuccess, + datapb.ChannelWatchState_WatchFailure, + datapb.ChannelWatchState_ReleaseSuccess, + datapb.ChannelWatchState_ReleaseFailure, + } + + for _, es := range endStates { + em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { + return errors.New("mocked error") + }, func(name string) { t.FailNow() }, time.Millisecond*100) + + ch := make(chan struct{}, 1) + ddl := time.Now().Add(time.Minute) + + go func() { + evt := event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + TimeoutTs: ddl.UnixNano(), + }, + } + em.retryHandlePutEvent(evt) + ch <- struct{}{} + }() + + em.eventChan <- event{ + eventType: putEventType, + vChanName: "", + version: 0, + info: &datapb.ChannelWatchInfo{ + State: es, + TimeoutTs: ddl.UnixNano(), + }, + } + select { + case <-ch: + case <-time.NewTimer(time.Minute).C: + t.FailNow() + } + } + }) +}