From f595500383fac40b7f50b7ca84b7f45145abb872 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 13 Dec 2022 16:15:21 +0800 Subject: [PATCH] Move sync delete policy from deleteNode to insertBufferNode (#21152) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- internal/datanode/buffer.go | 10 +++ internal/datanode/data_sync_service.go | 11 ++- internal/datanode/flow_graph_delete_node.go | 47 +++-------- .../datanode/flow_graph_delete_node_test.go | 72 +++++++++++++--- .../datanode/flow_graph_insert_buffer_node.go | 35 +++++--- .../flow_graph_insert_buffer_node_test.go | 83 ++++++++++++++----- 6 files changed, 179 insertions(+), 79 deletions(-) diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index 44c3b2f6ce..66e648abaf 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "sync" "go.uber.org/zap" @@ -40,6 +41,7 @@ import ( // but at the first stage, this struct is only used for delete buff type DelBufferManager struct { channel Channel + mu sync.Mutex // guards delMemorySize and delBufHeap delMemorySize int64 delBufHeap *PriorityQueue } @@ -97,6 +99,8 @@ func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey, delDataBuf.updateStartAndEndPosition(startPos, endPos) //4. update and sync memory size with priority queue + bm.mu.Lock() + defer bm.mu.Unlock() if !loaded { delDataBuf.item.segmentID = segID delDataBuf.item.memorySize = bufSize @@ -120,6 +124,8 @@ func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok boo } func (bm *DelBufferManager) Delete(segID UniqueID) { + bm.mu.Lock() + defer bm.mu.Unlock() if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok { item := buf.item bm.delMemorySize -= item.memorySize @@ -141,6 +147,8 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr bm.Delete(segID) } } + bm.mu.Lock() + defer bm.mu.Unlock() // only store delBuf if EntriesNum > 0 if compactToDelBuff.EntriesNum > 0 { if loaded { @@ -156,6 +164,8 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr } func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID { + bm.mu.Lock() + defer bm.mu.Unlock() var shouldFlushSegments []UniqueID if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() { return shouldFlushSegments diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index af59157cc6..dacd6c7105 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -54,6 +54,7 @@ type dataSyncService struct { dataCoord types.DataCoord // DataCoord instance to interact with clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed + delBufferManager *DelBufferManager flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushManager flushManager // flush manager handles flush process chunkManager storage.ChunkManager @@ -80,6 +81,12 @@ func newDataSyncService(ctx context.Context, ctx1, cancel := context.WithCancel(ctx) + delBufferManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + service := &dataSyncService{ ctx: ctx1, cancelFn: cancel, @@ -93,6 +100,7 @@ func newDataSyncService(ctx context.Context, vchannelName: vchan.GetChannelName(), dataCoord: dataCoord, clearSignal: clearSignal, + delBufferManager: delBufferManager, flushingSegCache: flushingSegCache, chunkManager: chunkManager, compactor: compactor, @@ -287,6 +295,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro insertBufferNode, err = newInsertBufferNode( dsService.ctx, dsService.collectionID, + dsService.delBufferManager, dsService.flushCh, dsService.resendTTCh, dsService.flushManager, @@ -298,7 +307,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro } var deleteNode Node - deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.clearSignal, c) + deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.delBufferManager, dsService.clearSignal, c) if err != nil { return err } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 93105b1fb2..a08736514b 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -117,32 +117,13 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax) } - //here we adopt a quite radical strategy: - //every time we make sure that the N biggest delDataBuf can be flushed - //when memsize usage reaches a certain level - //then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments - //the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit - segmentsToFlush := dn.delBufferManager.ShouldFlushSegments() - for _, msgSegmentID := range fgMsg.segmentsToSync { - existed := false - for _, autoFlushSegment := range segmentsToFlush { - if msgSegmentID == autoFlushSegment { - existed = true - break - } - } - if !existed { - segmentsToFlush = append(segmentsToFlush, msgSegmentID) - } - } - // process flush messages - if len(segmentsToFlush) > 0 { + if len(fgMsg.segmentsToSync) > 0 { log.Debug("DeleteNode receives flush message", - zap.Int64s("segIDs", segmentsToFlush), + zap.Int64s("segIDs", fgMsg.segmentsToSync), zap.String("vChannelName", dn.channelName), zap.Time("posTime", tsoutil.PhysicalTime(fgMsg.endPositions[0].Timestamp))) - for _, segmentToFlush := range segmentsToFlush { + for _, segmentToFlush := range fgMsg.segmentsToSync { buf, ok := dn.delBufferManager.Load(segmentToFlush) if !ok { // no related delta data to flush, send empty buf to complete flush life-cycle @@ -239,23 +220,19 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [ return segID2Pks, segID2Tss } -func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { +func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(config.maxQueueLength) baseNode.SetMaxParallelism(config.maxParallelism) return &deleteNode{ - ctx: ctx, - BaseNode: baseNode, - delBufferManager: &DelBufferManager{ - channel: config.channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, - }, - channel: config.channel, - idAllocator: config.allocator, - channelName: config.vChannelName, - flushManager: fm, - clearSignal: sig, + ctx: ctx, + BaseNode: baseNode, + delBufferManager: delBufManager, + channel: config.channel, + idAllocator: config.allocator, + channelName: config.vChannelName, + flushManager: fm, + clearSignal: sig, }, nil } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 5ba9c9d6e3..bcc7c70e6f 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -51,7 +51,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn, err := newDeleteNode(test.ctx, nil, make(chan string, 1), test.config) + dn, err := newDeleteNode(test.ctx, nil, nil, make(chan string, 1), test.config) assert.Nil(t, err) assert.NotNil(t, dn) @@ -189,8 +189,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: &allocator{}, vChannelName: chanName, } + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } - dn, err := newDeleteNode(context.Background(), fm, make(chan string, 1), c) + dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) assert.Nil(t, err) segID2Pks, _ := dn.filterSegmentByPK(0, varCharPks, tss) @@ -219,7 +224,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { vChannelName: chanName, } - dn, err := newDeleteNode(context.Background(), fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + + dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) assert.Nil(t, err) segID2Pks, _ := dn.filterSegmentByPK(0, int64Pks, tss) @@ -254,7 +265,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(te, err) msg := genFlowGraphDeleteMsg(int64Pks, chanName) @@ -277,7 +293,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(te, err) msg := genFlowGraphDeleteMsg(int64Pks, chanName) @@ -306,8 +327,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } sig := make(chan string, 1) - delNode, err := newDeleteNode(ctx, fm, sig, c) + delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c) assert.Nil(t, err) msg := genFlowGraphDeleteMsg(int64Pks, chanName) @@ -344,7 +370,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(t, err) compactedSegment := UniqueID(10020987) @@ -394,10 +425,15 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } mockFlushManager := &mockFlushManager{ recordFlushedSeg: true, } - delNode, err := newDeleteNode(ctx, mockFlushManager, make(chan string, 1), c) + delNode, err := newDeleteNode(ctx, mockFlushManager, delBufManager, make(chan string, 1), c) assert.Nil(t, err) //2. here we set flushing segments inside fgmsg to empty @@ -410,6 +446,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //and the sum of memory consumption in this case is 208 //so no segments will be flushed paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "300") + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize) @@ -422,6 +459,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { msg.deleteMessages = []*msgstream.DeleteMsg{} msg.segmentsToSync = []UniqueID{} paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "200") + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) @@ -429,6 +467,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //4. there is no new delete msg and delBufferSize is still 200 //we expect there will not be any auto flush del + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) @@ -438,6 +477,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //segment which is 48 in size to be flushed, so the remained del memory size //will be 112 paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "150") + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize) @@ -446,6 +486,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //6. we reset buffer bytes to 60, then most of the segments will be flushed //except for the smallest entry with size equaling to 32 paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "60") + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize) @@ -455,6 +496,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //is more than 20, so all five segments will be flushed and the remained //del memory will be lowered to zero paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "20") + fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize) @@ -484,7 +526,12 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) require.NoError(t, err) tests := []struct { @@ -529,7 +576,12 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + delBufManager := &DelBufferManager{ + channel: &channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) require.NoError(t, err) tests := []struct { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index fd7c3c2ad2..5a79a0cc42 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -46,10 +46,11 @@ import ( type insertBufferNode struct { BaseNode - ctx context.Context - channelName string - channel Channel - idAllocator allocatorInterface + ctx context.Context + channelName string + delBufferManager *DelBufferManager // manager of delete msg + channel Channel + idAllocator allocatorInterface flushMap sync.Map flushChan <-chan flushMsg @@ -330,6 +331,19 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload } } + // sync delete + //here we adopt a quite radical strategy: + //every time we make sure that the N biggest delDataBuf can be flushed + //when memsize usage reaches a certain level + //the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit + segmentsToFlush := ibNode.delBufferManager.ShouldFlushSegments() + for _, segID := range segmentsToFlush { + syncTasks[segID] = &syncTask{ + buffer: nil, // nil is valid + segmentID: segID, + } + } + syncSegmentIDs := ibNode.channel.listSegmentIDsToSync(fgMsg.endPositions[0].Timestamp) for _, segID := range syncSegmentIDs { buf := ibNode.GetBuffer(segID) @@ -593,7 +607,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni return ibNode.channel.getCollectionAndPartitionID(segmentID) } -func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, +func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DelBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) { baseNode := BaseNode{} @@ -659,10 +673,11 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl flushingSegCache: flushingSegCache, flushManager: fm, - channel: config.channel, - idAllocator: config.allocator, - channelName: config.vChannelName, - ttMerger: mt, - ttLogger: &timeTickLogger{vChannelName: config.vChannelName}, + delBufferManager: delBufManager, + channel: config.channel, + idAllocator: config.allocator, + channelName: config.vChannelName, + ttMerger: mt, + ttLogger: &timeTickLogger{vChannelName: config.vChannelName}, }, nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 065f7a7f78..a232285384 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -105,8 +105,13 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) assert.NotNil(t, iBNode) require.NoError(t, err) @@ -120,7 +125,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { cd: 0, } - _, err = newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + _, err = newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) assert.Error(t, err) } @@ -198,8 +203,13 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) // trigger log ts @@ -361,7 +371,12 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) // Auto flush number of rows set to 2 @@ -596,7 +611,12 @@ func TestRollBF(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) // Auto flush number of rows set to 2 @@ -675,7 +695,8 @@ func TestRollBF(t *testing.T) { type InsertBufferNodeSuite struct { suite.Suite - channel *ChannelMeta + channel *ChannelMeta + delBufManager *DelBufferManager collID UniqueID partID UniqueID @@ -690,9 +711,16 @@ func (s *InsertBufferNodeSuite) SetupSuite() { pkType: schemapb.DataType_Int64, } + delBufManager := &DelBufferManager{ + channel: s.channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + s.collID = 1 s.partID = 10 s.channel = newChannel("channel", s.collID, nil, rc, s.cm) + s.delBufManager = delBufManager s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir)) s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() @@ -737,9 +765,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { fgMsg := &flowGraphMsg{dropCollection: true} node := &insertBufferNode{ - channelName: s.channel.channelName, - channel: s.channel, - flushChan: make(chan flushMsg, 100), + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: make(chan flushMsg, 100), } syncTasks := node.FillInSyncTasks(fgMsg, nil) @@ -756,9 +785,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { segToFlush := []UniqueID{1, 2} node := &insertBufferNode{ - channelName: s.channel.channelName, - channel: s.channel, - flushChan: make(chan flushMsg, 100), + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: make(chan flushMsg, 100), } buffer := BufferData{ @@ -783,9 +813,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { s.Run("drop partition", func() { fgMsg := flowGraphMsg{dropPartitions: []UniqueID{s.partID}, endPositions: []*internalpb.MsgPosition{{Timestamp: 100}}} node := &insertBufferNode{ - channelName: s.channel.channelName, - channel: s.channel, - flushChan: make(chan flushMsg, 100), + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: make(chan flushMsg, 100), } syncTasks := node.FillInSyncTasks(&fgMsg, nil) @@ -802,9 +833,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { s.Run("manual sync", func() { flushCh := make(chan flushMsg, 100) node := &insertBufferNode{ - channelName: s.channel.channelName, - channel: s.channel, - flushChan: flushCh, + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: flushCh, } for i := 1; i <= 3; i++ { @@ -829,9 +861,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { s.Run("manual sync over load", func() { flushCh := make(chan flushMsg, 100) node := &insertBufferNode{ - channelName: s.channel.channelName, - channel: s.channel, - flushChan: flushCh, + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: flushCh, } for i := 1; i <= 100; i++ { @@ -852,7 +885,6 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { s.Assert().False(task.auto) s.Assert().False(task.dropped) } - }) } @@ -935,7 +967,12 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + delBufManager := &DelBufferManager{ + channel: channel, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) inMsg := genFlowGraphInsertMsg(insertChannelName)