diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 79b5ff9668..c5a874f9a7 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -25,16 +25,12 @@ import ( "io" "math/rand" "os" - "path" - "strings" "sync" "sync/atomic" "syscall" "time" "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" - v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -42,7 +38,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -107,7 +102,7 @@ type DataNode struct { initOnce sync.Once startOnce sync.Once stopOnce sync.Once - wg sync.WaitGroup + stopWaiter sync.WaitGroup sessionMu sync.Mutex // to fix data race session *sessionutil.Session watchKv kv.WatchKV @@ -267,67 +262,6 @@ func (node *DataNode) Init() error { return initError } -// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now) -func (node *DataNode) StartWatchChannels(ctx context.Context) { - defer node.wg.Done() - defer logutil.LogPanic() - // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} - // TODO, this is risky, we'd better watch etcd with revision rather simply a path - watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID)) - log.Info("Start watch channel", zap.String("prefix", watchPrefix)) - evtChan := node.watchKv.WatchWithPrefix(watchPrefix) - // after watch, first check all exists nodes first - err := node.checkWatchedList() - if err != nil { - log.Warn("StartWatchChannels failed", zap.Error(err)) - return - } - for { - select { - case <-ctx.Done(): - log.Info("watch etcd loop quit") - return - case event, ok := <-evtChan: - if !ok { - log.Warn("datanode failed to watch channel, return") - go node.StartWatchChannels(ctx) - return - } - - if err := event.Err(); err != nil { - log.Warn("datanode watch channel canceled", zap.Error(event.Err())) - // https://github.com/etcd-io/etcd/issues/8980 - if event.Err() == v3rpc.ErrCompacted { - go node.StartWatchChannels(ctx) - return - } - // if watch loop return due to event canceled, the datanode is not functional anymore - log.Panic("datanode is not functional for event canceled", zap.Error(err)) - return - } - for _, evt := range event.Events { - // We need to stay in order until events enqueued - node.handleChannelEvt(evt) - } - } - } -} - -// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched -// serves the corner case for etcd connection lost and missing some events -func (node *DataNode) checkWatchedList() error { - // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} - prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", paramtable.GetNodeID())) - keys, values, err := node.watchKv.LoadWithPrefix(prefix) - if err != nil { - return err - } - for i, val := range values { - node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val)) - } - return nil -} - // handleChannelEvt handles event from kv watch event func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { var e *event @@ -347,123 +281,6 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value) } -func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { - switch e.eventType { - case putEventType: - watchInfo, err := parsePutEventData(data) - if err != nil { - log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err)) - return - } - - if isEndWatchState(watchInfo.State) { - log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String())) - return - } - - if watchInfo.Progress != 0 { - log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version)) - return - } - - e.info = watchInfo - e.vChanName = watchInfo.GetVchan().GetChannelName() - log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String())) - case deleteEventType: - e.vChanName = parseDeleteEventKey(key) - log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key)) - } - - actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager( - node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval, - )) - - if !loaded { - actualManager.Run() - } - - actualManager.handleEvent(*e) - - // Whenever a delete event comes, this eventManager will be removed from map - if e.eventType == deleteEventType { - if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded { - m.Close() - } - } -} - -func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { - watchInfo := datapb.ChannelWatchInfo{} - err := proto.Unmarshal(data, &watchInfo) - if err != nil { - return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err) - } - - if watchInfo.Vchan == nil { - return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") - } - reviseVChannelInfo(watchInfo.GetVchan()) - return &watchInfo, nil -} - -func parseDeleteEventKey(key string) string { - parts := strings.Split(key, "/") - vChanName := parts[len(parts)-1] - return vChanName -} - -func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) { - vChanName := watchInfo.GetVchan().GetChannelName() - key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName) - tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second)) - - switch watchInfo.State { - case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: - if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { - log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) - watchInfo.State = datapb.ChannelWatchState_WatchFailure - } else { - log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) - watchInfo.State = datapb.ChannelWatchState_WatchSuccess - } - case datapb.ChannelWatchState_ToRelease: - // there is no reason why we release fail - node.tryToReleaseFlowgraph(vChanName) - watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess - } - - v, err := proto.Marshal(watchInfo) - if err != nil { - return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err) - } - - success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v)) - // etcd error - if err != nil { - // flow graph will leak if not release, causing new datanode failed to subscribe - node.tryToReleaseFlowgraph(vChanName) - log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName), - zap.String("state", watchInfo.State.String()), zap.Error(err)) - return err - } - // etcd valid but the states updated. - if !success { - log.Info("handle put event: failed to compare version and swap, release flowgraph", - zap.String("key", key), zap.String("state", watchInfo.State.String()), - zap.String("vChanName", vChanName)) - // flow graph will leak if not release, causing new datanode failed to subscribe - node.tryToReleaseFlowgraph(vChanName) - return nil - } - log.Info("handle put event success", zap.String("key", key), - zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName)) - return nil -} - -func (node *DataNode) handleDeleteEvent(vChanName string) { - node.tryToReleaseFlowgraph(vChanName) -} - // tryToReleaseFlowgraph tries to release a flowgraph func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { log.Info("try to release flowgraph", zap.String("vChanName", vChanName)) @@ -473,7 +290,7 @@ func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { // BackGroundGC runs in background to release datanode resources // GOOSE TODO: remove background GC, using ToRelease for drop-collection after #15846 func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { - defer node.wg.Done() + defer node.stopWaiter.Done() log.Info("DataNode Background GC Start") for { select { @@ -531,7 +348,7 @@ func (node *DataNode) Start() error { node.chunkManager = chunkManager - node.wg.Add(1) + node.stopWaiter.Add(1) go node.BackGroundGC(node.clearSignal) go node.compactionExecutor.start(node.ctx) @@ -541,7 +358,7 @@ func (node *DataNode) Start() error { go node.timeTickSender.start(node.ctx) } - node.wg.Add(1) + node.stopWaiter.Add(1) // Start node watch node go node.StartWatchChannels(node.ctx) @@ -601,7 +418,7 @@ func (node *DataNode) Stop() error { node.session.Stop() } - node.wg.Wait() + node.stopWaiter.Wait() }) return nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index fb006af9e4..a59ff8f3c4 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -18,7 +18,6 @@ package datanode import ( "context" - "fmt" "math/rand" "os" "strconv" @@ -26,13 +25,11 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -189,7 +186,7 @@ func TestDataNode(t *testing.T) { t.Run("Test BackGroundGC", func(t *testing.T) { vchanNameCh := make(chan string) node.clearSignal = vchanNameCh - node.wg.Add(1) + node.stopWaiter.Add(1) go node.BackGroundGC(vchanNameCh) testDataSyncs := []struct { @@ -214,262 +211,4 @@ func TestDataNode(t *testing.T) { return true }, 2*time.Second, 10*time.Millisecond) }) - -} - -func TestWatchChannel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.NoError(t, err) - defer etcdCli.Close() - node.SetEtcdClient(etcdCli) - err = node.Init() - assert.NoError(t, err) - err = node.Start() - assert.NoError(t, err) - defer node.Stop() - err = node.Register() - assert.NoError(t, err) - - defer cancel() - - t.Run("test watch channel", func(t *testing.T) { - kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) - oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" - path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh) - err = kv.Save(path, string([]byte{23})) - assert.NoError(t, err) - - ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) - path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch) - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: ch, - UnflushedSegmentIds: []int64{}, - } - info := &datapb.ChannelWatchInfo{ - State: datapb.ChannelWatchState_ToWatch, - Vchan: vchan, - } - val, err := proto.Marshal(info) - assert.NoError(t, err) - err = kv.Save(path, string(val)) - assert.NoError(t, err) - - assert.Eventually(t, func() bool { - exist := node.flowgraphManager.exist(ch) - if !exist { - return false - } - bs, err := kv.LoadBytes(fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)) - if err != nil { - return false - } - watchInfo := &datapb.ChannelWatchInfo{} - err = proto.Unmarshal(bs, watchInfo) - if err != nil { - return false - } - return watchInfo.GetState() == datapb.ChannelWatchState_WatchSuccess - }, 3*time.Second, 100*time.Millisecond) - - err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) - assert.NoError(t, err) - - assert.Eventually(t, func() bool { - exist := node.flowgraphManager.exist(ch) - return !exist - }, 3*time.Second, 100*time.Millisecond) - }) - - t.Run("Test release channel", func(t *testing.T) { - kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) - oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" - path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh) - err = kv.Save(path, string([]byte{23})) - assert.NoError(t, err) - - ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) - path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch) - c := make(chan struct{}) - go func() { - ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) - c <- struct{}{} - cnt := 0 - for { - evt := <-ec - for _, event := range evt.Events { - if strings.Contains(string(event.Kv.Key), ch) { - cnt++ - } - } - if cnt >= 2 { - break - } - } - c <- struct{}{} - }() - // wait for check goroutine start Watch - <-c - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: ch, - UnflushedSegmentIds: []int64{}, - } - info := &datapb.ChannelWatchInfo{ - State: datapb.ChannelWatchState_ToRelease, - Vchan: vchan, - } - val, err := proto.Marshal(info) - assert.NoError(t, err) - err = kv.Save(path, string(val)) - assert.NoError(t, err) - - // wait for check goroutine received 2 events - <-c - exist := node.flowgraphManager.exist(ch) - assert.False(t, exist) - - err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) - assert.NoError(t, err) - //TODO there is not way to sync Release done, use sleep for now - time.Sleep(100 * time.Millisecond) - - exist = node.flowgraphManager.exist(ch) - assert.False(t, exist) - - }) - - t.Run("handle watch info failed", func(t *testing.T) { - e := &event{ - eventType: putEventType, - } - - node.handleWatchInfo(e, "test1", []byte{23}) - - exist := node.flowgraphManager.exist("test1") - assert.False(t, exist) - - info := datapb.ChannelWatchInfo{ - Vchan: nil, - State: datapb.ChannelWatchState_Uncomplete, - } - bs, err := proto.Marshal(&info) - assert.NoError(t, err) - node.handleWatchInfo(e, "test2", bs) - - exist = node.flowgraphManager.exist("test2") - assert.False(t, exist) - - chPut := make(chan struct{}, 1) - chDel := make(chan struct{}, 1) - - ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) - m := newChannelEventManager( - func(info *datapb.ChannelWatchInfo, version int64) error { - r := node.handlePutEvent(info, version) - chPut <- struct{}{} - return r - }, - func(vChan string) { - node.handleDeleteEvent(vChan) - chDel <- struct{}{} - }, time.Millisecond*100, - ) - node.eventManagerMap.Insert(ch, m) - m.Run() - defer m.Close() - - info = datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ChannelName: ch}, - State: datapb.ChannelWatchState_Uncomplete, - } - bs, err = proto.Marshal(&info) - assert.NoError(t, err) - - msFactory := node.factory - defer func() { node.factory = msFactory }() - - // todo review the UT logic - // As we remove timetick channel logic, flow_graph_insert_buffer_node no longer depend on MessageStreamFactory - // so data_sync_service can be created. this assert becomes true - node.factory = &FailMessageStreamFactory{} - node.handleWatchInfo(e, ch, bs) - <-chPut - exist = node.flowgraphManager.exist(ch) - assert.True(t, exist) - }) - - t.Run("handle watchinfo out of date", func(t *testing.T) { - chPut := make(chan struct{}, 1) - chDel := make(chan struct{}, 1) - // inject eventManager - ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) - m := newChannelEventManager( - func(info *datapb.ChannelWatchInfo, version int64) error { - r := node.handlePutEvent(info, version) - chPut <- struct{}{} - return r - }, - func(vChan string) { - node.handleDeleteEvent(vChan) - chDel <- struct{}{} - }, time.Millisecond*100, - ) - node.eventManagerMap.Insert(ch, m) - m.Run() - defer m.Close() - e := &event{ - eventType: putEventType, - version: 10000, - } - - info := datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ChannelName: ch}, - State: datapb.ChannelWatchState_Uncomplete, - } - bs, err := proto.Marshal(&info) - assert.NoError(t, err) - - node.handleWatchInfo(e, ch, bs) - <-chPut - exist := node.flowgraphManager.exist("test3") - assert.False(t, exist) - }) - - t.Run("handle watchinfo compatibility", func(t *testing.T) { - info := datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: "delta-channel1", - UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}}, - FlushedSegments: []*datapb.SegmentInfo{{ID: 2}}, - DroppedSegments: []*datapb.SegmentInfo{{ID: 3}}, - UnflushedSegmentIds: []int64{1}, - }, - State: datapb.ChannelWatchState_Uncomplete, - } - bs, err := proto.Marshal(&info) - assert.NoError(t, err) - - newWatchInfo, err := parsePutEventData(bs) - assert.NoError(t, err) - - assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments()) - assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments()) - assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments()) - assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds()) - assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds()) - assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds()) - }) } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 9e8babee5b..3c44648ab6 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -17,20 +17,205 @@ package datanode import ( + "context" + "fmt" + "path" + "strings" "sync" "time" "github.com/golang/protobuf/proto" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) const retryWatchInterval = 20 * time.Second +// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now) +func (node *DataNode) StartWatchChannels(ctx context.Context) { + defer node.stopWaiter.Done() + defer logutil.LogPanic() + // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} + // TODO, this is risky, we'd better watch etcd with revision rather simply a path + watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID)) + log.Info("Start watch channel", zap.String("prefix", watchPrefix)) + evtChan := node.watchKv.WatchWithPrefix(watchPrefix) + // after watch, first check all exists nodes first + err := node.checkWatchedList() + if err != nil { + log.Warn("StartWatchChannels failed", zap.Error(err)) + return + } + for { + select { + case <-ctx.Done(): + log.Info("watch etcd loop quit") + return + case event, ok := <-evtChan: + if !ok { + log.Warn("datanode failed to watch channel, return") + go node.StartWatchChannels(ctx) + return + } + + if err := event.Err(); err != nil { + log.Warn("datanode watch channel canceled", zap.Error(event.Err())) + // https://github.com/etcd-io/etcd/issues/8980 + if event.Err() == v3rpc.ErrCompacted { + go node.StartWatchChannels(ctx) + return + } + // if watch loop return due to event canceled, the datanode is not functional anymore + log.Panic("datanode is not functional for event canceled", zap.Error(err)) + return + } + for _, evt := range event.Events { + // We need to stay in order until events enqueued + node.handleChannelEvt(evt) + } + } + } +} + +// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched +// serves the corner case for etcd connection lost and missing some events +func (node *DataNode) checkWatchedList() error { + // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} + prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", paramtable.GetNodeID())) + keys, values, err := node.watchKv.LoadWithPrefix(prefix) + if err != nil { + return err + } + for i, val := range values { + node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val)) + } + return nil +} + +func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { + switch e.eventType { + case putEventType: + watchInfo, err := parsePutEventData(data) + if err != nil { + log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err)) + return + } + + if isEndWatchState(watchInfo.State) { + log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String())) + return + } + + if watchInfo.Progress != 0 { + log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version)) + return + } + + e.info = watchInfo + e.vChanName = watchInfo.GetVchan().GetChannelName() + log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String())) + case deleteEventType: + e.vChanName = parseDeleteEventKey(key) + log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key)) + } + + actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager( + node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval, + )) + + if !loaded { + actualManager.Run() + } + + actualManager.handleEvent(*e) + + // Whenever a delete event comes, this eventManager will be removed from map + if e.eventType == deleteEventType { + if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded { + m.Close() + } + } +} + +func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { + watchInfo := datapb.ChannelWatchInfo{} + err := proto.Unmarshal(data, &watchInfo) + if err != nil { + return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err) + } + + if watchInfo.Vchan == nil { + return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") + } + reviseVChannelInfo(watchInfo.GetVchan()) + return &watchInfo, nil +} + +func parseDeleteEventKey(key string) string { + parts := strings.Split(key, "/") + vChanName := parts[len(parts)-1] + return vChanName +} + +func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) { + vChanName := watchInfo.GetVchan().GetChannelName() + key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName) + tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second)) + + switch watchInfo.State { + case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: + if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { + log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) + watchInfo.State = datapb.ChannelWatchState_WatchFailure + } else { + log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + watchInfo.State = datapb.ChannelWatchState_WatchSuccess + } + case datapb.ChannelWatchState_ToRelease: + // there is no reason why we release fail + node.tryToReleaseFlowgraph(vChanName) + watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess + } + + v, err := proto.Marshal(watchInfo) + if err != nil { + return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err) + } + + success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v)) + // etcd error + if err != nil { + // flow graph will leak if not release, causing new datanode failed to subscribe + node.tryToReleaseFlowgraph(vChanName) + log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName), + zap.String("state", watchInfo.State.String()), zap.Error(err)) + return err + } + // etcd valid but the states updated. + if !success { + log.Info("handle put event: failed to compare version and swap, release flowgraph", + zap.String("key", key), zap.String("state", watchInfo.State.String()), + zap.String("vChanName", vChanName)) + // flow graph will leak if not release, causing new datanode failed to subscribe + node.tryToReleaseFlowgraph(vChanName) + return nil + } + log.Info("handle put event success", zap.String("key", key), + zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName)) + return nil +} + +func (node *DataNode) handleDeleteEvent(vChanName string) { + node.tryToReleaseFlowgraph(vChanName) +} + type event struct { eventType int vChanName string diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 9d7e6bb7cd..9a1bbc5d17 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -17,18 +17,282 @@ package datanode import ( + "context" "fmt" + "math/rand" "path" + "strings" "testing" "time" "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) +func TestWatchChannel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + assert.NoError(t, err) + defer etcdCli.Close() + node.SetEtcdClient(etcdCli) + err = node.Init() + assert.NoError(t, err) + err = node.Start() + assert.NoError(t, err) + defer node.Stop() + err = node.Register() + assert.NoError(t, err) + + defer cancel() + + t.Run("test watch channel", func(t *testing.T) { + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) + oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" + path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh) + err = kv.Save(path, string([]byte{23})) + assert.NoError(t, err) + + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch) + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: ch, + UnflushedSegmentIds: []int64{}, + } + info := &datapb.ChannelWatchInfo{ + State: datapb.ChannelWatchState_ToWatch, + Vchan: vchan, + } + val, err := proto.Marshal(info) + assert.NoError(t, err) + err = kv.Save(path, string(val)) + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + exist := node.flowgraphManager.exist(ch) + if !exist { + return false + } + bs, err := kv.LoadBytes(fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)) + if err != nil { + return false + } + watchInfo := &datapb.ChannelWatchInfo{} + err = proto.Unmarshal(bs, watchInfo) + if err != nil { + return false + } + return watchInfo.GetState() == datapb.ChannelWatchState_WatchSuccess + }, 3*time.Second, 100*time.Millisecond) + + err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + exist := node.flowgraphManager.exist(ch) + return !exist + }, 3*time.Second, 100*time.Millisecond) + }) + + t.Run("Test release channel", func(t *testing.T) { + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) + oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" + path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh) + err = kv.Save(path, string([]byte{23})) + assert.NoError(t, err) + + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch) + c := make(chan struct{}) + go func() { + ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) + c <- struct{}{} + cnt := 0 + for { + evt := <-ec + for _, event := range evt.Events { + if strings.Contains(string(event.Kv.Key), ch) { + cnt++ + } + } + if cnt >= 2 { + break + } + } + c <- struct{}{} + }() + // wait for check goroutine start Watch + <-c + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: ch, + UnflushedSegmentIds: []int64{}, + } + info := &datapb.ChannelWatchInfo{ + State: datapb.ChannelWatchState_ToRelease, + Vchan: vchan, + } + val, err := proto.Marshal(info) + assert.NoError(t, err) + err = kv.Save(path, string(val)) + assert.NoError(t, err) + + // wait for check goroutine received 2 events + <-c + exist := node.flowgraphManager.exist(ch) + assert.False(t, exist) + + err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) + assert.NoError(t, err) + //TODO there is not way to sync Release done, use sleep for now + time.Sleep(100 * time.Millisecond) + + exist = node.flowgraphManager.exist(ch) + assert.False(t, exist) + + }) + + t.Run("handle watch info failed", func(t *testing.T) { + e := &event{ + eventType: putEventType, + } + + node.handleWatchInfo(e, "test1", []byte{23}) + + exist := node.flowgraphManager.exist("test1") + assert.False(t, exist) + + info := datapb.ChannelWatchInfo{ + Vchan: nil, + State: datapb.ChannelWatchState_Uncomplete, + } + bs, err := proto.Marshal(&info) + assert.NoError(t, err) + node.handleWatchInfo(e, "test2", bs) + + exist = node.flowgraphManager.exist("test2") + assert.False(t, exist) + + chPut := make(chan struct{}, 1) + chDel := make(chan struct{}, 1) + + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + m := newChannelEventManager( + func(info *datapb.ChannelWatchInfo, version int64) error { + r := node.handlePutEvent(info, version) + chPut <- struct{}{} + return r + }, + func(vChan string) { + node.handleDeleteEvent(vChan) + chDel <- struct{}{} + }, time.Millisecond*100, + ) + node.eventManagerMap.Insert(ch, m) + m.Run() + defer m.Close() + + info = datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ChannelName: ch}, + State: datapb.ChannelWatchState_Uncomplete, + } + bs, err = proto.Marshal(&info) + assert.NoError(t, err) + + msFactory := node.factory + defer func() { node.factory = msFactory }() + + // todo review the UT logic + // As we remove timetick channel logic, flow_graph_insert_buffer_node no longer depend on MessageStreamFactory + // so data_sync_service can be created. this assert becomes true + node.factory = &FailMessageStreamFactory{} + node.handleWatchInfo(e, ch, bs) + <-chPut + exist = node.flowgraphManager.exist(ch) + assert.True(t, exist) + }) + + t.Run("handle watchinfo out of date", func(t *testing.T) { + chPut := make(chan struct{}, 1) + chDel := make(chan struct{}, 1) + // inject eventManager + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) + m := newChannelEventManager( + func(info *datapb.ChannelWatchInfo, version int64) error { + r := node.handlePutEvent(info, version) + chPut <- struct{}{} + return r + }, + func(vChan string) { + node.handleDeleteEvent(vChan) + chDel <- struct{}{} + }, time.Millisecond*100, + ) + node.eventManagerMap.Insert(ch, m) + m.Run() + defer m.Close() + e := &event{ + eventType: putEventType, + version: 10000, + } + + info := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ChannelName: ch}, + State: datapb.ChannelWatchState_Uncomplete, + } + bs, err := proto.Marshal(&info) + assert.NoError(t, err) + + node.handleWatchInfo(e, ch, bs) + <-chPut + exist := node.flowgraphManager.exist("test3") + assert.False(t, exist) + }) + + t.Run("handle watchinfo compatibility", func(t *testing.T) { + info := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: "delta-channel1", + UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}}, + FlushedSegments: []*datapb.SegmentInfo{{ID: 2}}, + DroppedSegments: []*datapb.SegmentInfo{{ID: 3}}, + UnflushedSegmentIds: []int64{1}, + }, + State: datapb.ChannelWatchState_Uncomplete, + } + bs, err := proto.Marshal(&info) + assert.NoError(t, err) + + newWatchInfo, err := parsePutEventData(bs) + assert.NoError(t, err) + + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds()) + }) +} + func TestChannelEventManager(t *testing.T) { t.Run("normal case", func(t *testing.T) { ch := make(chan struct{}, 1)