From 3be6672753756d2bc53b2cc54ceb63e46b01c3d1 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 28 Sep 2021 18:22:16 +0800 Subject: [PATCH] Add flush channel for delete_node (#8762) Resolves: #8761 Signed-off-by: yangxuan --- internal/datanode/data_node.go | 44 +++++++++++++------ internal/datanode/data_node_test.go | 16 +++---- internal/datanode/data_sync_service.go | 10 ++--- internal/datanode/data_sync_service_test.go | 4 +- internal/datanode/flow_graph_delete_node.go | 16 ++++++- .../datanode/flow_graph_delete_node_test.go | 8 ++-- internal/indexnode/indexnode.go | 5 ++- 7 files changed, 70 insertions(+), 33 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 8b02b2fc82..96a01c22dc 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -87,9 +87,10 @@ type DataNode struct { chanMut sync.RWMutex vchan2SyncService map[string]*dataSyncService // vchannel name - vchan2FlushCh map[string]chan<- *flushMsg // vchannel name - clearSignal chan UniqueID // collection ID - segmentCache *Cache + vchan2FlushChs map[string]*flushChans // vchannel name to flush channels + + clearSignal chan UniqueID // collection ID + segmentCache *Cache rootCoord types.RootCoord dataCoord types.DataCoord @@ -103,6 +104,14 @@ type DataNode struct { msFactory msgstream.Factory } +type flushChans struct { + // Flush signal for insert buffer + insertBufferCh chan *flushMsg + + // Flush signal for delete buffer + deleteBufferCh chan *flushMsg +} + // NewDataNode will return a DataNode with abnormal state. func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { rand.Seed(time.Now().UnixNano()) @@ -118,7 +127,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { segmentCache: newCache(), vchan2SyncService: make(map[string]*dataSyncService), - vchan2FlushCh: make(map[string]chan<- *flushMsg), + vchan2FlushChs: make(map[string]*flushChans), clearSignal: make(chan UniqueID, 100), } node.UpdateStateCode(internalpb.StateCode_Abnormal) @@ -256,14 +265,18 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())), ) - flushChan := make(chan *flushMsg, 100) - dataSyncService, err := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache) + flushChs := &flushChans{ + insertBufferCh: make(chan *flushMsg, 100), + deleteBufferCh: make(chan *flushMsg, 100), + } + + dataSyncService, err := newDataSyncService(node.ctx, flushChs, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache) if err != nil { return err } node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService - node.vchan2FlushCh[vchan.GetChannelName()] = flushChan + node.vchan2FlushChs[vchan.GetChannelName()] = flushChs log.Info("Start New dataSyncService", zap.Int64("Collection ID", vchan.GetCollectionID()), @@ -302,7 +315,7 @@ func (node *DataNode) ReleaseDataSyncService(vchanName string) { } delete(node.vchan2SyncService, vchanName) - delete(node.vchan2FlushCh, vchanName) + delete(node.vchan2FlushChs, vchanName) log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName)) } @@ -455,14 +468,14 @@ func (node *DataNode) ReadyToFlush() error { node.chanMut.RLock() defer node.chanMut.RUnlock() - if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushCh) == 0 { + if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushChs) == 0 { // Healthy but Idle msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work" log.Warn(msg) return errors.New(msg) } - if len(node.vchan2SyncService) != len(node.vchan2FlushCh) { + if len(node.vchan2SyncService) != len(node.vchan2FlushChs) { // TODO restart msg := "DataNode HEALTHY but abnormal inside, restarting..." log.Warn(msg) @@ -511,20 +524,25 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen node.segmentCache.Cache(id) node.chanMut.RLock() - flushCh, ok := node.vchan2FlushCh[chanName] + flushChs, ok := node.vchan2FlushChs[chanName] node.chanMut.RUnlock() if !ok { status.Reason = "DataNode abnormal, restarting" return status, nil } - flushmsg := &flushMsg{ + insertFlushmsg := flushMsg{ msgID: req.Base.MsgID, timestamp: req.Base.Timestamp, segmentID: id, collectionID: req.CollectionID, } - flushCh <- flushmsg + + // Copy flushMsg to a different address + deleteFlushMsg := insertFlushmsg + + flushChs.insertBufferCh <- &insertFlushmsg + flushChs.deleteBufferCh <- &deleteFlushMsg } log.Debug("FlushSegments tasks triggered", diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index db9f1efa5f..6cf81f0fe8 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -105,14 +105,14 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) if testcase.expect { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) - assert.NotNil(t, node1.vchan2FlushCh) + assert.NotNil(t, node1.vchan2FlushChs) assert.NotNil(t, node1.vchan2SyncService) sync, ok := node1.vchan2SyncService[testcase.channels[0]] assert.True(t, ok) assert.NotNil(t, sync) assert.Equal(t, UniqueID(1), sync.collectionID) assert.Equal(t, len(testcase.channels), len(node1.vchan2SyncService)) - assert.Equal(t, len(node1.vchan2FlushCh), len(node1.vchan2SyncService)) + assert.Equal(t, len(node1.vchan2FlushChs), len(node1.vchan2SyncService)) } else { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) assert.Equal(t, testcase.failReason, resp.Reason) @@ -174,17 +174,17 @@ func TestDataNode(t *testing.T) { UnflushedSegments: []*datapb.SegmentInfo{}, } - require.Equal(t, 0, len(node2.vchan2FlushCh)) + require.Equal(t, 0, len(node2.vchan2FlushChs)) require.Equal(t, 0, len(node2.vchan2SyncService)) err := node2.NewDataSyncService(vchan) assert.NoError(t, err) - assert.Equal(t, 1, len(node2.vchan2FlushCh)) + assert.Equal(t, 1, len(node2.vchan2FlushChs)) assert.Equal(t, 1, len(node2.vchan2SyncService)) err = node2.NewDataSyncService(vchan) assert.NoError(t, err) - assert.Equal(t, 1, len(node2.vchan2FlushCh)) + assert.Equal(t, 1, len(node2.vchan2FlushChs)) assert.Equal(t, 1, len(node2.vchan2SyncService)) cancel() @@ -367,7 +367,7 @@ func TestDataNode(t *testing.T) { assert.Eventually(t, func() bool { node.chanMut.Lock() defer node.chanMut.Unlock() - return len(node.vchan2FlushCh) == 0 + return len(node.vchan2FlushChs) == 0 }, time.Second, time.Millisecond) cancel() @@ -384,12 +384,12 @@ func TestDataNode(t *testing.T) { err := node.NewDataSyncService(vchan) require.NoError(t, err) - require.Equal(t, 1, len(node.vchan2FlushCh)) + require.Equal(t, 1, len(node.vchan2FlushChs)) require.Equal(t, 1, len(node.vchan2SyncService)) time.Sleep(100 * time.Millisecond) node.ReleaseDataSyncService(dmChannelName) - assert.Equal(t, 0, len(node.vchan2FlushCh)) + assert.Equal(t, 0, len(node.vchan2FlushChs)) assert.Equal(t, 0, len(node.vchan2SyncService)) s, ok := node.vchan2SyncService[dmChannelName] diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3bfcd749f1..34031e00f6 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -31,7 +31,7 @@ type dataSyncService struct { ctx context.Context cancelFn context.CancelFunc fg *flowgraph.TimeTickedFlowGraph - flushChan <-chan *flushMsg + flushChs *flushChans replica Replica idAllocator allocatorInterface msFactory msgstream.Factory @@ -45,7 +45,7 @@ type dataSyncService struct { } func newDataSyncService(ctx context.Context, - flushChan <-chan *flushMsg, + flushChs *flushChans, replica Replica, alloc allocatorInterface, factory msgstream.Factory, @@ -66,7 +66,7 @@ func newDataSyncService(ctx context.Context, ctx: ctx1, cancelFn: cancel, fg: nil, - flushChan: flushChan, + flushChs: flushChs, replica: replica, idAllocator: alloc, msFactory: factory, @@ -181,7 +181,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.replica, dsService.msFactory, dsService.idAllocator, - dsService.flushChan, + dsService.flushChs.insertBufferCh, saveBinlog, vchanInfo.GetChannelName(), dsService.flushingSegCache, @@ -190,7 +190,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - dn := newDeleteNode(dsService.replica, vchanInfo.GetChannelName()) + dn := newDeleteNode(dsService.replica, vchanInfo.GetChannelName(), dsService.flushChs.deleteBufferCh) var deleteNode Node = dn diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 19146fc602..1ba74f637f 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -98,7 +98,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { } ds, err := newDataSyncService(ctx, - make(chan *flushMsg), + &flushChans{make(chan *flushMsg), make(chan *flushMsg)}, replica, NewAllocatorFactory(), test.inMsgFactory, @@ -163,7 +163,7 @@ func TestDataSyncService_Start(t *testing.T) { mockRootCoord := &RootCoordFactory{} collectionID := UniqueID(1) - flushChan := make(chan *flushMsg, 100) + flushChan := &flushChans{make(chan *flushMsg, 100), make(chan *flushMsg, 100)} replica := newReplica(mockRootCoord, collectionID) allocFactory := NewAllocatorFactory(1) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index af7716fdfd..3c732f7d7a 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -26,6 +26,8 @@ type deleteNode struct { channelName string replica Replica + + flushCh <-chan *flushMsg } func (dn *deleteNode) Name() string { @@ -50,6 +52,16 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { return []Msg{} } + select { + case fmsg := <-dn.flushCh: + currentSegID := fmsg.segmentID + log.Debug("DeleteNode receives flush message", + zap.Int64("segmentID", currentSegID), + zap.Int64("collectionID", fmsg.collectionID), + ) + default: + } + return []Msg{} } @@ -75,7 +87,7 @@ func (dn *deleteNode) filterSegmentByPK(pks []int64) (map[int64][]int64, error) return results, nil } -func newDeleteNode(replica Replica, channelName string) *deleteNode { +func newDeleteNode(replica Replica, channelName string, flushCh <-chan *flushMsg) *deleteNode { baseNode := BaseNode{} baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) @@ -84,5 +96,7 @@ func newDeleteNode(replica Replica, channelName string) *deleteNode { channelName: channelName, replica: replica, + + flushCh: flushCh, } } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 40264f0320..0c2c221992 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -52,7 +52,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn := newDeleteNode(test.replica, "") + dn := newDeleteNode(test.replica, "", make(chan *flushMsg)) assert.NotNil(t, dn) assert.Equal(t, "deleteNode", dn.Name()) @@ -81,11 +81,13 @@ func TestFlowGraphDeleteNode_Operate(te *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn := deleteNode{} + flushCh := make(chan *flushMsg, 10) + dn := deleteNode{flushCh: flushCh} if test.invalidIn != nil { rt := dn.Operate(test.invalidIn) assert.Empty(t, rt) } else { + flushCh <- &flushMsg{0, 100, 10, 1} rt := dn.Operate(test.validIn) assert.Empty(t, rt) } @@ -145,7 +147,7 @@ func Test_GetSegmentsByPKs(t *testing.T) { mockReplica.normalSegments[segment4.segmentID] = segment4 mockReplica.flushedSegments[segment5.segmentID] = segment5 mockReplica.flushedSegments[segment6.segmentID] = segment6 - dn := newDeleteNode(mockReplica, "test") + dn := newDeleteNode(mockReplica, "test", make(chan *flushMsg)) results, err := dn.filterSegmentByPK([]int64{0, 1, 2, 3, 4}) assert.Nil(t, err) expected := map[int64][]int64{ diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 5cdb463515..80cf7aecca 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -147,12 +147,15 @@ func (i *IndexNode) Init() error { BucketName: Params.MinioBucketName, CreateBucket: true, } - i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option) + kv, err := miniokv.NewMinIOKV(i.loopCtx, option) if err != nil { log.Error("IndexNode NewMinIOKV failed", zap.Error(err)) initErr = err return } + + i.kv = kv + log.Debug("IndexNode NewMinIOKV success") i.closer = trace.InitTracing("index_node")