mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Move etcd watch related code into eventmanager (#27192)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
33e3e78937
commit
09505ea78e
@ -25,16 +25,12 @@ import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -42,7 +38,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/datanode/allocator"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
@ -107,7 +102,7 @@ type DataNode struct {
|
||||
initOnce sync.Once
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
stopWaiter sync.WaitGroup
|
||||
sessionMu sync.Mutex // to fix data race
|
||||
session *sessionutil.Session
|
||||
watchKv kv.WatchKV
|
||||
@ -267,67 +262,6 @@ func (node *DataNode) Init() error {
|
||||
return initError
|
||||
}
|
||||
|
||||
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
|
||||
func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
||||
defer node.wg.Done()
|
||||
defer logutil.LogPanic()
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
||||
watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
|
||||
log.Info("Start watch channel", zap.String("prefix", watchPrefix))
|
||||
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
|
||||
// after watch, first check all exists nodes first
|
||||
err := node.checkWatchedList()
|
||||
if err != nil {
|
||||
log.Warn("StartWatchChannels failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("watch etcd loop quit")
|
||||
return
|
||||
case event, ok := <-evtChan:
|
||||
if !ok {
|
||||
log.Warn("datanode failed to watch channel, return")
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
if err := event.Err(); err != nil {
|
||||
log.Warn("datanode watch channel canceled", zap.Error(event.Err()))
|
||||
// https://github.com/etcd-io/etcd/issues/8980
|
||||
if event.Err() == v3rpc.ErrCompacted {
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
// if watch loop return due to event canceled, the datanode is not functional anymore
|
||||
log.Panic("datanode is not functional for event canceled", zap.Error(err))
|
||||
return
|
||||
}
|
||||
for _, evt := range event.Events {
|
||||
// We need to stay in order until events enqueued
|
||||
node.handleChannelEvt(evt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched
|
||||
// serves the corner case for etcd connection lost and missing some events
|
||||
func (node *DataNode) checkWatchedList() error {
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", paramtable.GetNodeID()))
|
||||
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, val := range values {
|
||||
node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleChannelEvt handles event from kv watch event
|
||||
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
var e *event
|
||||
@ -347,123 +281,6 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
|
||||
}
|
||||
|
||||
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
|
||||
switch e.eventType {
|
||||
case putEventType:
|
||||
watchInfo, err := parsePutEventData(data)
|
||||
if err != nil {
|
||||
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if isEndWatchState(watchInfo.State) {
|
||||
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
|
||||
return
|
||||
}
|
||||
|
||||
if watchInfo.Progress != 0 {
|
||||
log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
|
||||
return
|
||||
}
|
||||
|
||||
e.info = watchInfo
|
||||
e.vChanName = watchInfo.GetVchan().GetChannelName()
|
||||
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
|
||||
case deleteEventType:
|
||||
e.vChanName = parseDeleteEventKey(key)
|
||||
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
|
||||
}
|
||||
|
||||
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
|
||||
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
|
||||
))
|
||||
|
||||
if !loaded {
|
||||
actualManager.Run()
|
||||
}
|
||||
|
||||
actualManager.handleEvent(*e)
|
||||
|
||||
// Whenever a delete event comes, this eventManager will be removed from map
|
||||
if e.eventType == deleteEventType {
|
||||
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
|
||||
watchInfo := datapb.ChannelWatchInfo{}
|
||||
err := proto.Unmarshal(data, &watchInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
|
||||
}
|
||||
|
||||
if watchInfo.Vchan == nil {
|
||||
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
|
||||
}
|
||||
reviseVChannelInfo(watchInfo.GetVchan())
|
||||
return &watchInfo, nil
|
||||
}
|
||||
|
||||
func parseDeleteEventKey(key string) string {
|
||||
parts := strings.Split(key, "/")
|
||||
vChanName := parts[len(parts)-1]
|
||||
return vChanName
|
||||
}
|
||||
|
||||
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
|
||||
vChanName := watchInfo.GetVchan().GetChannelName()
|
||||
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
|
||||
tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
|
||||
|
||||
switch watchInfo.State {
|
||||
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
|
||||
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
|
||||
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
|
||||
watchInfo.State = datapb.ChannelWatchState_WatchFailure
|
||||
} else {
|
||||
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
|
||||
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
|
||||
}
|
||||
case datapb.ChannelWatchState_ToRelease:
|
||||
// there is no reason why we release fail
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
|
||||
}
|
||||
|
||||
v, err := proto.Marshal(watchInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
|
||||
}
|
||||
|
||||
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
|
||||
// etcd error
|
||||
if err != nil {
|
||||
// flow graph will leak if not release, causing new datanode failed to subscribe
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName),
|
||||
zap.String("state", watchInfo.State.String()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// etcd valid but the states updated.
|
||||
if !success {
|
||||
log.Info("handle put event: failed to compare version and swap, release flowgraph",
|
||||
zap.String("key", key), zap.String("state", watchInfo.State.String()),
|
||||
zap.String("vChanName", vChanName))
|
||||
// flow graph will leak if not release, causing new datanode failed to subscribe
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
return nil
|
||||
}
|
||||
log.Info("handle put event success", zap.String("key", key),
|
||||
zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *DataNode) handleDeleteEvent(vChanName string) {
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
}
|
||||
|
||||
// tryToReleaseFlowgraph tries to release a flowgraph
|
||||
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
|
||||
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
|
||||
@ -473,7 +290,7 @@ func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
|
||||
// BackGroundGC runs in background to release datanode resources
|
||||
// GOOSE TODO: remove background GC, using ToRelease for drop-collection after #15846
|
||||
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
|
||||
defer node.wg.Done()
|
||||
defer node.stopWaiter.Done()
|
||||
log.Info("DataNode Background GC Start")
|
||||
for {
|
||||
select {
|
||||
@ -531,7 +348,7 @@ func (node *DataNode) Start() error {
|
||||
|
||||
node.chunkManager = chunkManager
|
||||
|
||||
node.wg.Add(1)
|
||||
node.stopWaiter.Add(1)
|
||||
go node.BackGroundGC(node.clearSignal)
|
||||
|
||||
go node.compactionExecutor.start(node.ctx)
|
||||
@ -541,7 +358,7 @@ func (node *DataNode) Start() error {
|
||||
go node.timeTickSender.start(node.ctx)
|
||||
}
|
||||
|
||||
node.wg.Add(1)
|
||||
node.stopWaiter.Add(1)
|
||||
// Start node watch node
|
||||
go node.StartWatchChannels(node.ctx)
|
||||
|
||||
@ -601,7 +418,7 @@ func (node *DataNode) Stop() error {
|
||||
node.session.Stop()
|
||||
}
|
||||
|
||||
node.wg.Wait()
|
||||
node.stopWaiter.Wait()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -26,13 +25,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
@ -189,7 +186,7 @@ func TestDataNode(t *testing.T) {
|
||||
t.Run("Test BackGroundGC", func(t *testing.T) {
|
||||
vchanNameCh := make(chan string)
|
||||
node.clearSignal = vchanNameCh
|
||||
node.wg.Add(1)
|
||||
node.stopWaiter.Add(1)
|
||||
go node.BackGroundGC(vchanNameCh)
|
||||
|
||||
testDataSyncs := []struct {
|
||||
@ -214,262 +211,4 @@ func TestDataNode(t *testing.T) {
|
||||
return true
|
||||
}, 2*time.Second, 10*time.Millisecond)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestWatchChannel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
|
||||
etcdCli, err := etcd.GetEtcdClient(
|
||||
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||
Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||
Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||
assert.NoError(t, err)
|
||||
defer etcdCli.Close()
|
||||
node.SetEtcdClient(etcdCli)
|
||||
err = node.Init()
|
||||
assert.NoError(t, err)
|
||||
err = node.Start()
|
||||
assert.NoError(t, err)
|
||||
defer node.Stop()
|
||||
err = node.Register()
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer cancel()
|
||||
|
||||
t.Run("test watch channel", func(t *testing.T) {
|
||||
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
|
||||
path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh)
|
||||
err = kv.Save(path, string([]byte{23}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: ch,
|
||||
UnflushedSegmentIds: []int64{},
|
||||
}
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
State: datapb.ChannelWatchState_ToWatch,
|
||||
Vchan: vchan,
|
||||
}
|
||||
val, err := proto.Marshal(info)
|
||||
assert.NoError(t, err)
|
||||
err = kv.Save(path, string(val))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
if !exist {
|
||||
return false
|
||||
}
|
||||
bs, err := kv.LoadBytes(fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
watchInfo := &datapb.ChannelWatchInfo{}
|
||||
err = proto.Unmarshal(bs, watchInfo)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return watchInfo.GetState() == datapb.ChannelWatchState_WatchSuccess
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
return !exist
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("Test release channel", func(t *testing.T) {
|
||||
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
|
||||
path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh)
|
||||
err = kv.Save(path, string([]byte{23}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
c <- struct{}{}
|
||||
cnt := 0
|
||||
for {
|
||||
evt := <-ec
|
||||
for _, event := range evt.Events {
|
||||
if strings.Contains(string(event.Kv.Key), ch) {
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
if cnt >= 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
c <- struct{}{}
|
||||
}()
|
||||
// wait for check goroutine start Watch
|
||||
<-c
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: ch,
|
||||
UnflushedSegmentIds: []int64{},
|
||||
}
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
State: datapb.ChannelWatchState_ToRelease,
|
||||
Vchan: vchan,
|
||||
}
|
||||
val, err := proto.Marshal(info)
|
||||
assert.NoError(t, err)
|
||||
err = kv.Save(path, string(val))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// wait for check goroutine received 2 events
|
||||
<-c
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
assert.False(t, exist)
|
||||
|
||||
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
assert.NoError(t, err)
|
||||
//TODO there is not way to sync Release done, use sleep for now
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
exist = node.flowgraphManager.exist(ch)
|
||||
assert.False(t, exist)
|
||||
|
||||
})
|
||||
|
||||
t.Run("handle watch info failed", func(t *testing.T) {
|
||||
e := &event{
|
||||
eventType: putEventType,
|
||||
}
|
||||
|
||||
node.handleWatchInfo(e, "test1", []byte{23})
|
||||
|
||||
exist := node.flowgraphManager.exist("test1")
|
||||
assert.False(t, exist)
|
||||
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: nil,
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
node.handleWatchInfo(e, "test2", bs)
|
||||
|
||||
exist = node.flowgraphManager.exist("test2")
|
||||
assert.False(t, exist)
|
||||
|
||||
chPut := make(chan struct{}, 1)
|
||||
chDel := make(chan struct{}, 1)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
m := newChannelEventManager(
|
||||
func(info *datapb.ChannelWatchInfo, version int64) error {
|
||||
r := node.handlePutEvent(info, version)
|
||||
chPut <- struct{}{}
|
||||
return r
|
||||
},
|
||||
func(vChan string) {
|
||||
node.handleDeleteEvent(vChan)
|
||||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
|
||||
info = datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{ChannelName: ch},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err = proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msFactory := node.factory
|
||||
defer func() { node.factory = msFactory }()
|
||||
|
||||
// todo review the UT logic
|
||||
// As we remove timetick channel logic, flow_graph_insert_buffer_node no longer depend on MessageStreamFactory
|
||||
// so data_sync_service can be created. this assert becomes true
|
||||
node.factory = &FailMessageStreamFactory{}
|
||||
node.handleWatchInfo(e, ch, bs)
|
||||
<-chPut
|
||||
exist = node.flowgraphManager.exist(ch)
|
||||
assert.True(t, exist)
|
||||
})
|
||||
|
||||
t.Run("handle watchinfo out of date", func(t *testing.T) {
|
||||
chPut := make(chan struct{}, 1)
|
||||
chDel := make(chan struct{}, 1)
|
||||
// inject eventManager
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
m := newChannelEventManager(
|
||||
func(info *datapb.ChannelWatchInfo, version int64) error {
|
||||
r := node.handlePutEvent(info, version)
|
||||
chPut <- struct{}{}
|
||||
return r
|
||||
},
|
||||
func(vChan string) {
|
||||
node.handleDeleteEvent(vChan)
|
||||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
e := &event{
|
||||
eventType: putEventType,
|
||||
version: 10000,
|
||||
}
|
||||
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{ChannelName: ch},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
node.handleWatchInfo(e, ch, bs)
|
||||
<-chPut
|
||||
exist := node.flowgraphManager.exist("test3")
|
||||
assert.False(t, exist)
|
||||
})
|
||||
|
||||
t.Run("handle watchinfo compatibility", func(t *testing.T) {
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: "delta-channel1",
|
||||
UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}},
|
||||
FlushedSegments: []*datapb.SegmentInfo{{ID: 2}},
|
||||
DroppedSegments: []*datapb.SegmentInfo{{ID: 3}},
|
||||
UnflushedSegmentIds: []int64{1},
|
||||
},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
newWatchInfo, err := parsePutEventData(bs)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments())
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments())
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds())
|
||||
})
|
||||
}
|
||||
|
||||
@ -17,20 +17,205 @@
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
const retryWatchInterval = 20 * time.Second
|
||||
|
||||
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
|
||||
func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
||||
defer node.stopWaiter.Done()
|
||||
defer logutil.LogPanic()
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
||||
watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
|
||||
log.Info("Start watch channel", zap.String("prefix", watchPrefix))
|
||||
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
|
||||
// after watch, first check all exists nodes first
|
||||
err := node.checkWatchedList()
|
||||
if err != nil {
|
||||
log.Warn("StartWatchChannels failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("watch etcd loop quit")
|
||||
return
|
||||
case event, ok := <-evtChan:
|
||||
if !ok {
|
||||
log.Warn("datanode failed to watch channel, return")
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
if err := event.Err(); err != nil {
|
||||
log.Warn("datanode watch channel canceled", zap.Error(event.Err()))
|
||||
// https://github.com/etcd-io/etcd/issues/8980
|
||||
if event.Err() == v3rpc.ErrCompacted {
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
// if watch loop return due to event canceled, the datanode is not functional anymore
|
||||
log.Panic("datanode is not functional for event canceled", zap.Error(err))
|
||||
return
|
||||
}
|
||||
for _, evt := range event.Events {
|
||||
// We need to stay in order until events enqueued
|
||||
node.handleChannelEvt(evt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched
|
||||
// serves the corner case for etcd connection lost and missing some events
|
||||
func (node *DataNode) checkWatchedList() error {
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", paramtable.GetNodeID()))
|
||||
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, val := range values {
|
||||
node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
|
||||
switch e.eventType {
|
||||
case putEventType:
|
||||
watchInfo, err := parsePutEventData(data)
|
||||
if err != nil {
|
||||
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if isEndWatchState(watchInfo.State) {
|
||||
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
|
||||
return
|
||||
}
|
||||
|
||||
if watchInfo.Progress != 0 {
|
||||
log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
|
||||
return
|
||||
}
|
||||
|
||||
e.info = watchInfo
|
||||
e.vChanName = watchInfo.GetVchan().GetChannelName()
|
||||
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
|
||||
case deleteEventType:
|
||||
e.vChanName = parseDeleteEventKey(key)
|
||||
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
|
||||
}
|
||||
|
||||
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
|
||||
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
|
||||
))
|
||||
|
||||
if !loaded {
|
||||
actualManager.Run()
|
||||
}
|
||||
|
||||
actualManager.handleEvent(*e)
|
||||
|
||||
// Whenever a delete event comes, this eventManager will be removed from map
|
||||
if e.eventType == deleteEventType {
|
||||
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
|
||||
watchInfo := datapb.ChannelWatchInfo{}
|
||||
err := proto.Unmarshal(data, &watchInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
|
||||
}
|
||||
|
||||
if watchInfo.Vchan == nil {
|
||||
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
|
||||
}
|
||||
reviseVChannelInfo(watchInfo.GetVchan())
|
||||
return &watchInfo, nil
|
||||
}
|
||||
|
||||
func parseDeleteEventKey(key string) string {
|
||||
parts := strings.Split(key, "/")
|
||||
vChanName := parts[len(parts)-1]
|
||||
return vChanName
|
||||
}
|
||||
|
||||
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
|
||||
vChanName := watchInfo.GetVchan().GetChannelName()
|
||||
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
|
||||
tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
|
||||
|
||||
switch watchInfo.State {
|
||||
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
|
||||
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
|
||||
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
|
||||
watchInfo.State = datapb.ChannelWatchState_WatchFailure
|
||||
} else {
|
||||
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
|
||||
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
|
||||
}
|
||||
case datapb.ChannelWatchState_ToRelease:
|
||||
// there is no reason why we release fail
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
|
||||
}
|
||||
|
||||
v, err := proto.Marshal(watchInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
|
||||
}
|
||||
|
||||
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
|
||||
// etcd error
|
||||
if err != nil {
|
||||
// flow graph will leak if not release, causing new datanode failed to subscribe
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName),
|
||||
zap.String("state", watchInfo.State.String()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// etcd valid but the states updated.
|
||||
if !success {
|
||||
log.Info("handle put event: failed to compare version and swap, release flowgraph",
|
||||
zap.String("key", key), zap.String("state", watchInfo.State.String()),
|
||||
zap.String("vChanName", vChanName))
|
||||
// flow graph will leak if not release, causing new datanode failed to subscribe
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
return nil
|
||||
}
|
||||
log.Info("handle put event success", zap.String("key", key),
|
||||
zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *DataNode) handleDeleteEvent(vChanName string) {
|
||||
node.tryToReleaseFlowgraph(vChanName)
|
||||
}
|
||||
|
||||
type event struct {
|
||||
eventType int
|
||||
vChanName string
|
||||
|
||||
@ -17,18 +17,282 @@
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestWatchChannel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
|
||||
etcdCli, err := etcd.GetEtcdClient(
|
||||
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||
Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||
Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||
assert.NoError(t, err)
|
||||
defer etcdCli.Close()
|
||||
node.SetEtcdClient(etcdCli)
|
||||
err = node.Init()
|
||||
assert.NoError(t, err)
|
||||
err = node.Start()
|
||||
assert.NoError(t, err)
|
||||
defer node.Stop()
|
||||
err = node.Register()
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer cancel()
|
||||
|
||||
t.Run("test watch channel", func(t *testing.T) {
|
||||
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
|
||||
path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh)
|
||||
err = kv.Save(path, string([]byte{23}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: ch,
|
||||
UnflushedSegmentIds: []int64{},
|
||||
}
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
State: datapb.ChannelWatchState_ToWatch,
|
||||
Vchan: vchan,
|
||||
}
|
||||
val, err := proto.Marshal(info)
|
||||
assert.NoError(t, err)
|
||||
err = kv.Save(path, string(val))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
if !exist {
|
||||
return false
|
||||
}
|
||||
bs, err := kv.LoadBytes(fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
watchInfo := &datapb.ChannelWatchInfo{}
|
||||
err = proto.Unmarshal(bs, watchInfo)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return watchInfo.GetState() == datapb.ChannelWatchState_WatchSuccess
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
return !exist
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("Test release channel", func(t *testing.T) {
|
||||
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
|
||||
path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh)
|
||||
err = kv.Save(path, string([]byte{23}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
c <- struct{}{}
|
||||
cnt := 0
|
||||
for {
|
||||
evt := <-ec
|
||||
for _, event := range evt.Events {
|
||||
if strings.Contains(string(event.Kv.Key), ch) {
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
if cnt >= 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
c <- struct{}{}
|
||||
}()
|
||||
// wait for check goroutine start Watch
|
||||
<-c
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: ch,
|
||||
UnflushedSegmentIds: []int64{},
|
||||
}
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
State: datapb.ChannelWatchState_ToRelease,
|
||||
Vchan: vchan,
|
||||
}
|
||||
val, err := proto.Marshal(info)
|
||||
assert.NoError(t, err)
|
||||
err = kv.Save(path, string(val))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// wait for check goroutine received 2 events
|
||||
<-c
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
assert.False(t, exist)
|
||||
|
||||
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
assert.NoError(t, err)
|
||||
//TODO there is not way to sync Release done, use sleep for now
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
exist = node.flowgraphManager.exist(ch)
|
||||
assert.False(t, exist)
|
||||
|
||||
})
|
||||
|
||||
t.Run("handle watch info failed", func(t *testing.T) {
|
||||
e := &event{
|
||||
eventType: putEventType,
|
||||
}
|
||||
|
||||
node.handleWatchInfo(e, "test1", []byte{23})
|
||||
|
||||
exist := node.flowgraphManager.exist("test1")
|
||||
assert.False(t, exist)
|
||||
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: nil,
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
node.handleWatchInfo(e, "test2", bs)
|
||||
|
||||
exist = node.flowgraphManager.exist("test2")
|
||||
assert.False(t, exist)
|
||||
|
||||
chPut := make(chan struct{}, 1)
|
||||
chDel := make(chan struct{}, 1)
|
||||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
m := newChannelEventManager(
|
||||
func(info *datapb.ChannelWatchInfo, version int64) error {
|
||||
r := node.handlePutEvent(info, version)
|
||||
chPut <- struct{}{}
|
||||
return r
|
||||
},
|
||||
func(vChan string) {
|
||||
node.handleDeleteEvent(vChan)
|
||||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
|
||||
info = datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{ChannelName: ch},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err = proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msFactory := node.factory
|
||||
defer func() { node.factory = msFactory }()
|
||||
|
||||
// todo review the UT logic
|
||||
// As we remove timetick channel logic, flow_graph_insert_buffer_node no longer depend on MessageStreamFactory
|
||||
// so data_sync_service can be created. this assert becomes true
|
||||
node.factory = &FailMessageStreamFactory{}
|
||||
node.handleWatchInfo(e, ch, bs)
|
||||
<-chPut
|
||||
exist = node.flowgraphManager.exist(ch)
|
||||
assert.True(t, exist)
|
||||
})
|
||||
|
||||
t.Run("handle watchinfo out of date", func(t *testing.T) {
|
||||
chPut := make(chan struct{}, 1)
|
||||
chDel := make(chan struct{}, 1)
|
||||
// inject eventManager
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
m := newChannelEventManager(
|
||||
func(info *datapb.ChannelWatchInfo, version int64) error {
|
||||
r := node.handlePutEvent(info, version)
|
||||
chPut <- struct{}{}
|
||||
return r
|
||||
},
|
||||
func(vChan string) {
|
||||
node.handleDeleteEvent(vChan)
|
||||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
e := &event{
|
||||
eventType: putEventType,
|
||||
version: 10000,
|
||||
}
|
||||
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{ChannelName: ch},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
node.handleWatchInfo(e, ch, bs)
|
||||
<-chPut
|
||||
exist := node.flowgraphManager.exist("test3")
|
||||
assert.False(t, exist)
|
||||
})
|
||||
|
||||
t.Run("handle watchinfo compatibility", func(t *testing.T) {
|
||||
info := datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: "delta-channel1",
|
||||
UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}},
|
||||
FlushedSegments: []*datapb.SegmentInfo{{ID: 2}},
|
||||
DroppedSegments: []*datapb.SegmentInfo{{ID: 3}},
|
||||
UnflushedSegmentIds: []int64{1},
|
||||
},
|
||||
State: datapb.ChannelWatchState_Uncomplete,
|
||||
}
|
||||
bs, err := proto.Marshal(&info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
newWatchInfo, err := parsePutEventData(bs)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments())
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments())
|
||||
assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds())
|
||||
assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds())
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelEventManager(t *testing.T) {
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
ch := make(chan struct{}, 1)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user