diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 0601330164..3e9e343a87 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -77,6 +77,7 @@ const illegalRequestErrStr = "Illegal request" // makes sure DataNode implements types.DataNode var _ types.DataNode = (*DataNode)(nil) +// Params from config.yaml var Params paramtable.GlobalParamTable // DataNode communicates with outside services and unioun all @@ -89,9 +90,6 @@ var Params paramtable.GlobalParamTable // `NodeID` is unique to each datanode. // `State` is current statement of this data node, indicating whether it's healthy. // -// `vchan2SyncService` is a map of vchannlName to dataSyncService, so that datanode -// has ability to scale flowgraph. -// `vchan2FlushCh` holds flush-signal channels for every flowgraph. // `clearSignal` is a signal channel for releasing the flowgraph resources. // `segmentCache` stores all flushing and flushed segments. type DataNode struct { @@ -101,10 +99,8 @@ type DataNode struct { Role string State atomic.Value // internalpb.StateCode_Initializing - // TODO struct - chanMut sync.RWMutex - vchan2SyncService map[string]*dataSyncService // vchannel name - vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels + flowgraphManager *flowgraphManager + eventManagerMap sync.Map // vchannel name -> channelEventManager clearSignal chan string // vchannel name segmentCache *Cache @@ -138,15 +134,14 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { segmentCache: newCache(), compactionExecutor: newCompactionExecutor(), - vchan2SyncService: make(map[string]*dataSyncService), - vchan2FlushChs: make(map[string]chan flushMsg), - clearSignal: make(chan string, 100), + flowgraphManager: newFlowgraphManager(), + clearSignal: make(chan string, 100), } node.UpdateStateCode(internalpb.StateCode_Abnormal) return node } -// Set etcd client +// SetEtcdClient sets etcd client for DataNode func (node *DataNode) SetEtcdClient(etcdCli *clientv3.Client) { node.etcdCli = etcdCli } @@ -287,125 +282,134 @@ func (node *DataNode) checkWatchedList() error { return err } for i, val := range values { - node.handleWatchInfo(keys[i], []byte(val)) + node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val)) } return nil } // handleChannelEvt handles event from kv watch event func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { + var e *event switch evt.Type { case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here - log.Debug("DataNode handleChannelEvt EventTypePut", zap.String("key", string(evt.Kv.Key))) - node.handleWatchInfo(string(evt.Kv.Key), evt.Kv.Value) + e = &event{ + eventType: putEventType, + } + case clientv3.EventTypeDelete: - // guaranteed there is no "/" in channel name - parts := strings.Split(string(evt.Kv.Key), "/") - vchanName := parts[len(parts)-1] - log.Debug("DataNode handleChannelEvt EventTypeDelete", - zap.String("key", string(evt.Kv.Key)), - zap.String("vChanName", vchanName), - zap.Int64("node id", Params.DataNodeCfg.NodeID)) - node.ReleaseDataSyncService(vchanName) + e = &event{ + eventType: deleteEventType, + } + } + node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value) +} + +func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { + switch e.eventType { + case putEventType: + log.Info("DataNode is handling watchInfo put event", zap.String("key", key)) + + watchInfo, err := parsePutEventData(data) + if err != nil { + log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err)) + return + } + + e.info = watchInfo + e.vChanName = watchInfo.GetVchan().GetChannelName() + + case deleteEventType: + log.Info("DataNode is handling watchInfo delete event", zap.String("key", key)) + e.vChanName = parseDeleteEventKey(key) + } + + actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, &channelEventManager{ + eventChan: make(chan event, 10), + closeChan: make(chan struct{}), + handlePutEvent: node.handlePutEvent, + handleDeleteEvent: node.handleDeleteEvent, + }) + + if !loaded { + actualManager.(*channelEventManager).Run() + } + + actualManager.(*channelEventManager).handleEvent(*e) + + // Whenever a delete event comes, this eventManger will be removed from map + if e.eventType == deleteEventType { + if m, loaded := node.eventManagerMap.LoadAndDelete(e.vChanName); loaded { + m.(*channelEventManager).Close() + } } } -func (node *DataNode) handleWatchInfo(key string, data []byte) { +func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { watchInfo := datapb.ChannelWatchInfo{} err := proto.Unmarshal(data, &watchInfo) if err != nil { - log.Warn("fail to parse ChannelWatchInfo", zap.String("key", key), zap.Error(err)) - return + return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err) } - log.Debug("DataNode handleWatchInfo Unmarshal success", zap.String("key", key)) + if watchInfo.State == datapb.ChannelWatchState_Complete { - log.Warn("DataNode handleWatchInfo State is already ChannelWatchState_Complete", zap.String("key", key)) - return + return nil, fmt.Errorf("invalid event: event state is already ChannelWatchState_Compele") } + if watchInfo.Vchan == nil { - log.Warn("found ChannelWatchInfo with nil VChannelInfo", zap.String("key", key)) - return + return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") } - log.Warn("DataNode handleWatchInfo try to NewDataSyncService", zap.String("key", key)) - err = node.NewDataSyncService(watchInfo.Vchan) - if err != nil { - log.Warn("fail to create DataSyncService", zap.String("key", key), zap.Error(err)) - return - } - log.Warn("DataNode handleWatchInfo NewDataSyncService success", zap.String("key", key)) - watchInfo.State = datapb.ChannelWatchState_Complete - v, err := proto.Marshal(&watchInfo) - if err != nil { - log.Warn("DataNode handleWatchInfo fail to Marshal watchInfo", zap.String("key", key), zap.Error(err)) - return - } - k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), watchInfo.GetVchan().GetChannelName()) - log.Warn("DataNode handleWatchInfo try to Save", zap.String("key", key), - zap.String("k", k), - zap.String("v", string(v))) - - err = node.watchKv.Save(k, string(v)) - if err != nil { - log.Warn("DataNode handleWatchInfo fail to change WatchState to complete", zap.String("key", key), zap.Error(err)) - node.ReleaseDataSyncService(key) - } + return &watchInfo, nil } -// NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService. -func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { - node.chanMut.RLock() - if _, ok := node.vchan2SyncService[vchan.GetChannelName()]; ok { - node.chanMut.RUnlock() - return nil - } - node.chanMut.RUnlock() +func parseDeleteEventKey(key string) string { + parts := strings.Split(key, "/") + vChanName := parts[len(parts)-1] + return vChanName +} - replica, err := newReplica(node.ctx, node.rootCoord, vchan.CollectionID) +func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error { + vChanName := watchInfo.GetVchan().GetChannelName() + + if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil { + return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) + } + log.Debug("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + + watchInfo.State = datapb.ChannelWatchState_Complete + v, err := proto.Marshal(watchInfo) if err != nil { - return err + return fmt.Errorf("fail to marshal watchInfo with complete state, vChanName: %s, err: %v", vChanName, err) } - var alloc allocatorInterface = newAllocator(node.rootCoord) + k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName) + log.Debug("handle put event: try to save completed state", zap.String("key", k)) - log.Debug("DataNode NewDataSyncService received Vchannel Info", - zap.Int64("collectionID", vchan.CollectionID), - zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())), - zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())), - ) - - flushCh := make(chan flushMsg, 100) - - dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv, node.compactionExecutor) + err = node.watchKv.Save(k, string(v)) + // TODO DataNode unable to save into etcd, may need to panic if err != nil { - log.Error("DataNode NewDataSyncService newDataSyncService failed", - zap.Error(err), - ) - return err + node.releaseFlowgraph(vChanName) + return fmt.Errorf("fail to update completed state to etcd, vChanName: %s, err: %v", vChanName, err) } - - node.chanMut.Lock() - node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService - node.vchan2FlushChs[vchan.GetChannelName()] = flushCh - node.chanMut.Unlock() - - log.Info("DataNode NewDataSyncService success", - zap.Int64("Collection ID", vchan.GetCollectionID()), - zap.String("Vchannel name", vchan.GetChannelName()), - ) - dataSyncService.start() - return nil } +func (node *DataNode) handleDeleteEvent(vChanName string) { + node.releaseFlowgraph(vChanName) +} + +func (node *DataNode) releaseFlowgraph(vChanName string) { + node.flowgraphManager.release(vChanName) +} + // BackGroundGC runs in background to release datanode resources func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { log.Info("DataNode Background GC Start") for { select { - case vChan := <-vChannelCh: - log.Info("GC flowgraph", zap.String("vChan", vChan)) - node.ReleaseDataSyncService(vChan) + case vchanName := <-vChannelCh: + log.Info("GC flowgraph", zap.String("vChanName", vchanName)) + node.releaseFlowgraph(vchanName) case <-node.ctx.Done(): log.Info("DataNode ctx done") return @@ -413,23 +417,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { } } -// ReleaseDataSyncService release flowgraph resources for a vchanName -func (node *DataNode) ReleaseDataSyncService(vchanName string) { - log.Info("Release flowgraph resources begin", zap.String("Vchannel", vchanName)) - - node.chanMut.Lock() - dss, ok := node.vchan2SyncService[vchanName] - delete(node.vchan2SyncService, vchanName) - delete(node.vchan2FlushChs, vchanName) - node.chanMut.Unlock() - - if ok { - // This is a time-consuming process, better to put outside of the lock - dss.close() - } - log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName)) -} - // FilterThreshold is the start time ouf DataNode var FilterThreshold Timestamp @@ -541,51 +528,11 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo return states, nil } -func (node *DataNode) getChannelNamebySegmentID(segID UniqueID) string { - node.chanMut.RLock() - defer node.chanMut.RUnlock() - for name, dataSync := range node.vchan2SyncService { - if dataSync.replica.hasSegment(segID, true) { - return name - } - } - return "" -} - -func (node *DataNode) getChannelNamesbyCollectionID(collID UniqueID) []string { - node.chanMut.RLock() - defer node.chanMut.RUnlock() - - channels := make([]string, 0, len(node.vchan2SyncService)) - for name, dataSync := range node.vchan2SyncService { - if dataSync.collectionID == collID { - channels = append(channels, name) - } - } - return channels -} - // ReadyToFlush tells wether DataNode is ready for flushing func (node *DataNode) ReadyToFlush() error { if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { return errors.New("DataNode not in HEALTHY state") } - - node.chanMut.RLock() - defer node.chanMut.RUnlock() - if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushChs) == 0 { - // Healthy but Idle - msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work" - log.Warn(msg) - return errors.New(msg) - } - - if len(node.vchan2SyncService) != len(node.vchan2FlushChs) { - // TODO restart - msg := "DataNode HEALTHY but abnormal inside, restarting..." - log.Warn(msg) - return errors.New(msg) - } return nil } @@ -600,8 +547,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen ErrorCode: commonpb.ErrorCode_UnexpectedError, } - if err := node.ReadyToFlush(); err != nil { - status.Reason = err.Error() + if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { + status.Reason = "DataNode not in HEALTHY state" return status, nil } @@ -613,16 +560,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen processSegments := func(segmentIDs []UniqueID, flushed bool) bool { noErr := true for _, id := range segmentIDs { - chanName := node.getChannelNamebySegmentID(id) - if len(chanName) == 0 { - log.Warn("FlushSegments failed, cannot find segment in DataNode replica", - zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id)) - - status.Reason = fmt.Sprintf("DataNode replica not find segment %d!", id) - noErr = false - continue - } - if node.segmentCache.checkIfCached(id) { // Segment in flushing, ignore log.Info("Segment flushing, ignore the flush request until flush is done.", @@ -633,17 +570,15 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen node.segmentCache.Cache(id) - node.chanMut.RLock() - flushChs, ok := node.vchan2FlushChs[chanName] - node.chanMut.RUnlock() - if !ok { + flushCh, err := node.flowgraphManager.getFlushCh(id) + if err != nil { status.Reason = "DataNode abnormal, restarting" - log.Error("DataNode abnormal, no flushCh for a vchannel") + log.Error("DataNode abnormal, no flushCh for a vchannel", zap.Error(err)) noErr = false continue } - flushChs <- flushMsg{ + flushCh <- flushMsg{ msgID: req.Base.MsgID, timestamp: req.Base.Timestamp, segmentID: id, @@ -677,15 +612,7 @@ func (node *DataNode) Stop() error { node.UpdateStateCode(internalpb.StateCode_Abnormal) node.cancel() - - node.chanMut.RLock() - defer node.chanMut.RUnlock() - // close services - for _, syncService := range node.vchan2SyncService { - if syncService != nil { - (*syncService).close() - } - } + node.flowgraphManager.dropAll() if node.closer != nil { err := node.closer.Close() @@ -796,7 +723,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan ErrorCode: commonpb.ErrorCode_UnexpectedError, } - ds, ok := node.vchan2SyncService[req.GetChannel()] + ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel()) if !ok { log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel())) status.Reason = errIllegalCompactionPlan.Error() diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index debb45c3c3..7bd7bada42 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -29,11 +29,16 @@ import ( "time" "github.com/milvus-io/milvus/internal/common" - + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -42,12 +47,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" ) func TestMain(t *testing.M) { @@ -62,6 +61,8 @@ func TestMain(t *testing.M) { func TestDataNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + node := newIDLEDataNodeMock(ctx) etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) assert.Nil(t, err) @@ -132,39 +133,6 @@ func TestDataNode(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, stat.Status.ErrorCode) }) - t.Run("Test NewDataSyncService", func(t *testing.T) { - t.Skip() - ctx, cancel := context.WithCancel(context.Background()) - node2 := newIDLEDataNodeMock(ctx) - err = node2.Start() - assert.Nil(t, err) - dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService" - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegments: []*datapb.SegmentInfo{}, - } - - require.Equal(t, 0, len(node2.vchan2FlushChs)) - require.Equal(t, 0, len(node2.vchan2SyncService)) - - err := node2.NewDataSyncService(vchan) - assert.NoError(t, err) - assert.Equal(t, 1, len(node2.vchan2FlushChs)) - assert.Equal(t, 1, len(node2.vchan2SyncService)) - - err = node2.NewDataSyncService(vchan) - assert.NoError(t, err) - assert.Equal(t, 1, len(node2.vchan2FlushChs)) - assert.Equal(t, 1, len(node2.vchan2SyncService)) - - cancel() - <-node2.ctx.Done() - err = node2.Stop() - assert.Nil(t, err) - }) - t.Run("Test FlushSegments", func(t *testing.T) { dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" @@ -185,12 +153,14 @@ func TestDataNode(t *testing.T) { UnflushedSegments: []*datapb.SegmentInfo{}, FlushedSegments: []*datapb.SegmentInfo{}, } - err := node1.NewDataSyncService(vchan) - assert.Nil(t, err) - service, ok := node1.vchan2SyncService[dmChannelName] + err := node1.flowgraphManager.addAndStart(node1, vchan) + require.Nil(t, err) + + fgservice, ok := node1.flowgraphManager.getFlowgraphService(dmChannelName) assert.True(t, ok) - err = service.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) + + err = fgservice.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) assert.Nil(t, err) req := &datapb.FlushSegmentsRequest{ @@ -282,25 +252,6 @@ func TestDataNode(t *testing.T) { status, err = node1.FlushSegments(node1.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) - - // manual inject meta error - node1.chanMut.Lock() - node1.vchan2FlushChs[dmChannelName+"1"] = node1.vchan2FlushChs[dmChannelName] - delete(node1.vchan2FlushChs, dmChannelName) - node1.chanMut.Unlock() - node1.segmentCache.Remove(0) - - req = &datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{}, - DbID: 0, - CollectionID: 1, - SegmentIDs: []int64{0}, - } - - status, err = node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) - }) t.Run("Test GetTimeTickChannel", func(t *testing.T) { @@ -383,99 +334,13 @@ func TestDataNode(t *testing.T) { for i, test := range testDataSyncs { if i <= 2 { - - err = node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}) - + err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}) assert.Nil(t, err) - vchanNameCh <- test.dmChannelName } } - - assert.Eventually(t, func() bool { - node.chanMut.Lock() - defer node.chanMut.Unlock() - return len(node.vchan2FlushChs) == 0 - }, time.Second, time.Millisecond) - cancel() }) - - t.Run("Test ReleaseDataSyncService", func(t *testing.T) { - dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService" - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegments: []*datapb.SegmentInfo{}, - } - - err := node.NewDataSyncService(vchan) - require.NoError(t, err) - require.Equal(t, 1, len(node.vchan2FlushChs)) - require.Equal(t, 1, len(node.vchan2SyncService)) - time.Sleep(100 * time.Millisecond) - - node.ReleaseDataSyncService(dmChannelName) - assert.Equal(t, 0, len(node.vchan2FlushChs)) - assert.Equal(t, 0, len(node.vchan2SyncService)) - - s, ok := node.vchan2SyncService[dmChannelName] - assert.False(t, ok) - assert.Nil(t, s) - }) - - t.Run("Test GetChannelName", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - node := newIDLEDataNodeMock(ctx) - - testCollIDs := []UniqueID{0, 1, 2, 1} - testSegIDs := []UniqueID{10, 11, 12, 13} - testchanNames := []string{"a", "b", "c", "d"} - - node.chanMut.Lock() - for i, name := range testchanNames { - replica := &SegmentReplica{ - collectionID: testCollIDs[i], - newSegments: make(map[UniqueID]*Segment), - } - - err = replica.addNewSegment(testSegIDs[i], testCollIDs[i], 0, name, &internalpb.MsgPosition{}, nil) - assert.Nil(t, err) - node.vchan2SyncService[name] = &dataSyncService{collectionID: testCollIDs[i], replica: replica} - } - node.chanMut.Unlock() - - type Test struct { - inCollID UniqueID - expectedChannels []string - - inSegID UniqueID - expectedChannel string - } - tests := []Test{ - {0, []string{"a"}, 10, "a"}, - {1, []string{"b", "d"}, 11, "b"}, - {2, []string{"c"}, 12, "c"}, - {3, []string{}, 13, "d"}, - {3, []string{}, 100, ""}, - } - - for _, test := range tests { - actualChannels := node.getChannelNamesbyCollectionID(test.inCollID) - assert.ElementsMatch(t, test.expectedChannels, actualChannels) - - actualChannel := node.getChannelNamebySegmentID(test.inSegID) - assert.Equal(t, test.expectedChannel, actualChannel) - } - - cancel() - }) - - cancel() - <-node.ctx.Done() - err = node.Stop() - require.Nil(t, err) } func TestWatchChannel(t *testing.T) { @@ -495,6 +360,7 @@ func TestWatchChannel(t *testing.T) { defer cancel() t.Run("test watch channel", func(t *testing.T) { + // GOOSE TODO kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" path := fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, oldInvalidCh) @@ -540,29 +406,27 @@ func TestWatchChannel(t *testing.T) { // wait for check goroutine received 2 events <-c - node.chanMut.RLock() - _, has := node.vchan2SyncService[ch] - node.chanMut.RUnlock() - assert.True(t, has) + exist := node.flowgraphManager.exist(ch) + assert.True(t, exist) err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID)) assert.Nil(t, err) //TODO there is not way to sync Release done, use sleep for now time.Sleep(100 * time.Millisecond) - node.chanMut.RLock() - _, has = node.vchan2SyncService[ch] - node.chanMut.RUnlock() - assert.False(t, has) + exist = node.flowgraphManager.exist(ch) + assert.False(t, exist) }) t.Run("handle watch info failed", func(t *testing.T) { - node.handleWatchInfo("test1", []byte{23}) + e := &event{ + eventType: putEventType, + } - node.chanMut.RLock() - _, has := node.vchan2SyncService["test1"] - assert.False(t, has) - node.chanMut.RUnlock() + node.handleWatchInfo(e, "test1", []byte{23}) + + exist := node.flowgraphManager.exist("test1") + assert.False(t, exist) info := datapb.ChannelWatchInfo{ Vchan: nil, @@ -570,12 +434,10 @@ func TestWatchChannel(t *testing.T) { } bs, err := proto.Marshal(&info) assert.NoError(t, err) - node.handleWatchInfo("test2", bs) + node.handleWatchInfo(e, "test2", bs) - node.chanMut.RLock() - _, has = node.vchan2SyncService["test2"] - assert.False(t, has) - node.chanMut.RUnlock() + exist = node.flowgraphManager.exist("test2") + assert.False(t, exist) info = datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{}, @@ -587,12 +449,9 @@ func TestWatchChannel(t *testing.T) { node.msFactory = &FailMessageStreamFactory{ node.msFactory, } - node.handleWatchInfo("test3", bs) - node.chanMut.RLock() - _, has = node.vchan2SyncService["test3"] - assert.False(t, has) - node.chanMut.RUnlock() - + node.handleWatchInfo(e, "test3", bs) + exist = node.flowgraphManager.exist("test3") + assert.False(t, exist) }) } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go new file mode 100644 index 0000000000..abdfc6aa5c --- /dev/null +++ b/internal/datanode/event_manager.go @@ -0,0 +1,101 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "time" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + + "go.uber.org/zap" +) + +const retryWatchInterval = 20 * time.Second + +type event struct { + eventType int + vChanName string + info *datapb.ChannelWatchInfo +} + +type channelEventManager struct { + eventChan chan event + closeChan chan struct{} + handlePutEvent func(watchInfo *datapb.ChannelWatchInfo) error // node.handlePutEvent + handleDeleteEvent func(vChanName string) // node.handleDeleteEvent +} + +const ( + putEventType = 1 + deleteEventType = 2 +) + +func (e *channelEventManager) Run() { + go func() { + for { + select { + case event := <-e.eventChan: + switch event.eventType { + case putEventType: + // Trigger retry for-loop when fail to handle put event for the first time + if err := e.handlePutEvent(event.info); err != nil { + for { + log.Warn("handle put event fail, starting retry", + zap.String("vChanName", event.vChanName), + zap.String("retry interval", retryWatchInterval.String()), + zap.Error(err)) + + <-time.NewTimer(time.Second).C + + select { + case e, ok := <-e.eventChan: + // When getting a delete event at next retry, exit retry loop + // When getting a put event, just continue the retry + if ok && e.eventType == deleteEventType { + log.Warn("delete event triggerred, terminating retry.", + zap.String("vChanName", event.vChanName)) + return + } + default: + } + + err = e.handlePutEvent(event.info) + if err == nil { + log.Debug("retry to handle put event successfully", + zap.String("vChanName", event.vChanName)) + return + } + } + } + case deleteEventType: + e.handleDeleteEvent(event.vChanName) + } + case <-e.closeChan: + return + } + } + }() +} + +func (e *channelEventManager) handleEvent(event event) { + e.eventChan <- event +} + +func (e *channelEventManager) Close() { + close(e.closeChan) +} diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go new file mode 100644 index 0000000000..b27bdf978b --- /dev/null +++ b/internal/datanode/flow_graph_manager.go @@ -0,0 +1,127 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "fmt" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + + "go.uber.org/zap" +) + +type flowgraphManager struct { + flowgraphs sync.Map // vChannelName -> dataSyncService +} + +func newFlowgraphManager() *flowgraphManager { + return &flowgraphManager{} +} + +func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo) error { + log.Debug("received Vchannel Info", + zap.String("vChannelName", vchan.GetChannelName()), + zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())), + zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())), + ) + + if _, ok := fm.flowgraphs.Load(vchan.GetChannelName()); ok { + log.Warn("try to add an existed DataSyncService", zap.String("vChannelName", vchan.GetChannelName())) + return nil + } + + replica, err := newReplica(dn.ctx, dn.rootCoord, vchan.GetCollectionID()) + if err != nil { + log.Warn("new replica failed", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err)) + return err + } + + var alloc allocatorInterface = newAllocator(dn.rootCoord) + + dataSyncService, err := newDataSyncService(dn.ctx, make(chan flushMsg, 100), replica, alloc, dn.msFactory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.blobKv, dn.compactionExecutor) + if err != nil { + log.Warn("new data sync service fail", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err)) + return err + } + log.Info("successfully created dataSyncService", zap.String("vChannelName", vchan.GetChannelName())) + + dataSyncService.start() + log.Info("successfully started dataSyncService", zap.String("vChannelName", vchan.GetChannelName())) + + fm.flowgraphs.Store(vchan.GetChannelName(), dataSyncService) + return nil +} + +func (fm *flowgraphManager) release(vchanName string) { + log.Debug("release flowgraph resources begin", zap.String("vChannelName", vchanName)) + + if fg, loaded := fm.flowgraphs.LoadAndDelete(vchanName); loaded { + fg.(*dataSyncService).close() + } + log.Debug("release flowgraph resources end", zap.String("Vchannel", vchanName)) +} + +func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) { + var ( + flushCh chan flushMsg + loaded = false + ) + + fm.flowgraphs.Range(func(key, value interface{}) bool { + fg := value.(*dataSyncService) + if fg.replica.hasSegment(segID, true) { + loaded = true + flushCh = fg.flushCh + return false + } + return true + }) + + if loaded { + return flushCh, nil + } + + return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID) +} + +func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) { + fg, ok := fm.flowgraphs.Load(vchan) + if ok { + return fg.(*dataSyncService), ok + } + + return nil, ok +} + +func (fm *flowgraphManager) exist(vchan string) bool { + _, exist := fm.getFlowgraphService(vchan) + return exist +} + +func (fm *flowgraphManager) dropAll() { + log.Debug("start drop all flowgraph resources in DataNode") + fm.flowgraphs.Range(func(key, value interface{}) bool { + value.(*dataSyncService).close() + fm.flowgraphs.Delete(key.(string)) + + log.Debug("successfully dropped flowgraph", zap.String("vChannelName", key.(string))) + return true + }) + log.Debug("end drop all flowgraph resources in DataNode") +} diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go new file mode 100644 index 0000000000..fd658ee737 --- /dev/null +++ b/internal/datanode/flow_graph_manager_test.go @@ -0,0 +1,128 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/etcd" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFlowGraphManager(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + assert.Nil(t, err) + defer etcdCli.Close() + + node := newIDLEDataNodeMock(ctx) + node.SetEtcdClient(etcdCli) + err = node.Init() + require.Nil(t, err) + err = node.Start() + require.Nil(t, err) + + fm := newFlowgraphManager() + defer fm.dropAll() + t.Run("Test addAndStart", func(t *testing.T) { + vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart" + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: vchanName, + } + require.False(t, fm.exist(vchanName)) + + err := fm.addAndStart(node, vchan) + assert.NoError(t, err) + assert.True(t, fm.exist(vchanName)) + + fm.dropAll() + }) + + t.Run("Test Release", func(t *testing.T) { + vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-Release" + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: vchanName, + } + require.False(t, fm.exist(vchanName)) + + err := fm.addAndStart(node, vchan) + assert.NoError(t, err) + assert.True(t, fm.exist(vchanName)) + + fm.release(vchanName) + + assert.False(t, fm.exist(vchanName)) + fm.dropAll() + }) + + t.Run("Test getFlushCh", func(t *testing.T) { + vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-getFlushCh" + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: vchanName, + } + require.False(t, fm.exist(vchanName)) + + err := fm.addAndStart(node, vchan) + assert.NoError(t, err) + assert.True(t, fm.exist(vchanName)) + + fg, ok := fm.getFlowgraphService(vchanName) + require.True(t, ok) + err = fg.replica.addNewSegment(100, 1, 10, vchanName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) + require.NoError(t, err) + + tests := []struct { + isvalid bool + inSegID UniqueID + + description string + }{ + {true, 100, "valid input for existed segmentID 100"}, + {false, 101, "invalid input for not existed segmentID 101"}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ch, err := fm.getFlushCh(test.inSegID) + + if test.isvalid { + assert.NoError(t, err) + assert.NotNil(t, ch) + } else { + assert.Error(t, err) + assert.Nil(t, ch) + } + }) + } + }) + + t.Run("Test getFlowgraphService", func(t *testing.T) { + fg, ok := fm.getFlowgraphService("channel-not-exist") + assert.False(t, ok) + assert.Nil(t, fg) + }) +}