diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 129668456a..8920d97fe0 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -561,4 +561,8 @@ func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...U }() } +func (mfm *mockFlushManager) notifyAllFlushed() {} + +func (mfm *mockFlushManager) startDropping() {} + func (mfm *mockFlushManager) close() {} diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 499ed137cf..f1bfbd9ac8 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -146,7 +146,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro } // initialize flush manager for DataSync Service - dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, flushNotifyFunc(dsService)) + dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, + flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService)) // recover segment checkpoints for _, us := range vchanInfo.GetUnflushedSegments() { diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 0484a0d414..be9532359e 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -205,7 +205,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { ) replica := genMockReplica(segIDs, pks, chanName) kv := memkv.NewMemoryKV() - fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {}) + fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) t.Run("Test get segment by primary keys", func(te *testing.T) { c := &nodeConfig{ replica: replica, diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 82fe8c884d..1fef49c1af 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -83,7 +83,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { memkv := memkv.NewMemoryKV() - fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}) + fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) @@ -180,7 +180,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { memkv := memkv.NewMemoryKV() - fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {}) + fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) c := &nodeConfig{ @@ -395,7 +395,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { colRep.segmentFlushed(pack.segmentID) } wg.Done() - }) + }, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) c := &nodeConfig{ @@ -656,7 +656,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { memkv := memkv.NewMemoryKV() - fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}) + fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) c := &nodeConfig{ diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 5921b071fd..6e193e3be5 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/retry" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -42,6 +43,10 @@ type flushManager interface { flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error // injectFlush injects compaction or other blocking task before flush sync injectFlush(injection *taskInjection, segments ...UniqueID) + // startDropping changes flush manager into dropping mode + startDropping() + // notifyAllFlushed tells flush manager there is not future incoming flush task for drop mode + notifyAllFlushed() // close handles resource clean up close() } @@ -61,6 +66,9 @@ type segmentFlushPack struct { // notifyMetaFunc notify meta to persistent flush result type notifyMetaFunc func(*segmentFlushPack) +// flushAndDropFunc notifies meta to flush current state and drop virtual channel +type flushAndDropFunc func([]*segmentFlushPack) + // taskPostFunc clean up function after single flush task done type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc) @@ -70,6 +78,7 @@ type postInjectionFunc func(pack *segmentFlushPack) // make sure implementation var _ flushManager = (*rendezvousFlushManager)(nil) +// orderFlushQueue keeps the order of task notifyFunc execution in order type orderFlushQueue struct { sync.Once segmentID UniqueID @@ -111,8 +120,9 @@ func (q *orderFlushQueue) init() { func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flushTaskRunner { actual, loaded := q.working.LoadOrStore(string(pos.MsgID), newFlushTaskRunner(q.segmentID, q.injectCh)) t := actual.(*flushTaskRunner) + // not loaded means the task runner is new, do initializtion if !loaded { - + // take over injection if task queue is handling it q.injectMut.Lock() q.runningTasks++ if q.injectHandler != nil { @@ -120,7 +130,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush q.injectHandler = nil } q.injectMut.Unlock() - + // add task to tail q.tailMut.Lock() t.init(q.notifyFunc, q.postTask, q.tailCh) q.tailCh = t.finishSignal @@ -129,13 +139,18 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush return t } +// postTask handles clean up work after a task is done func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) { + // delete task from working map q.working.Delete(string(pack.pos.MsgID)) + // after descreasing working count, check whether flush queue is empty q.injectMut.Lock() q.runningTasks-- + // if flush queue is empty, let flush queue take over injection if q.runningTasks == 0 { q.injectHandler = newInjectHandler(q) } + // set postInjection function if injection is handled in task if postInjection != nil { q.postInjection = postInjection } @@ -163,12 +178,14 @@ func (q *orderFlushQueue) inject(inject *taskInjection) { q.injectCh <- inject } +// injectionHandler handles injection for empty flush queue type injectHandler struct { once sync.Once wg sync.WaitGroup done chan struct{} } +// newInjectHandler create injection handler for flush queue func newInjectHandler(q *orderFlushQueue) *injectHandler { h := &injectHandler{ done: make(chan struct{}), @@ -208,6 +225,14 @@ func (h *injectHandler) close() { }) } +type dropHandler struct { + sync.Mutex + dropFlushWg sync.WaitGroup + flushAndDrop flushAndDropFunc + allFlushed chan struct{} + packs []*segmentFlushPack +} + // rendezvousFlushManager makes sure insert & del buf all flushed type rendezvousFlushManager struct { allocatorInterface @@ -217,9 +242,12 @@ type rendezvousFlushManager struct { // segment id => flush queue dispatcher sync.Map notifyFunc notifyMetaFunc + + dropping atomic.Bool + dropHandler dropHandler } -// getFlushQueue +// getFlushQueue gets or creates a orderFlushQueue for segment id if not found func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue { newQueue := newOrderFlushQueue(segmentID, m.notifyFunc) actual, _ := m.dispatcher.LoadOrStore(segmentID, newQueue) @@ -229,14 +257,64 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu return queue } +func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) { + // in dropping mode + if m.dropping.Load() { + r := &flushTaskRunner{ + WaitGroup: sync.WaitGroup{}, + segmentID: segmentID, + } + r.WaitGroup.Add(1) // insert and delete are not bound in drop mode + r.runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos) + r.WaitGroup.Wait() + + m.dropHandler.Lock() + defer m.dropHandler.Unlock() + m.dropHandler.packs = append(m.dropHandler.packs, r.getFlushPack()) + + return + } + // normal mode + m.getFlushQueue(segmentID).enqueueInsertFlush(task, binlogs, statslogs, flushed, dropped, pos) +} + +func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flushDeleteTask, deltaLogs *DelDataBuf, pos *internalpb.MsgPosition) { + // in dropping mode + if m.dropping.Load() { + // preventing separate delete, check position exists in queue first + q := m.getFlushQueue(segmentID) + _, ok := q.working.Load(string(pos.MsgID)) + // if ok, means position insert data already in queue, just handle task in normal mode + // if not ok, means the insert buf should be handle in drop mode + if !ok { + r := &flushTaskRunner{ + WaitGroup: sync.WaitGroup{}, + segmentID: segmentID, + } + r.WaitGroup.Add(1) // insert and delete are not bound in drop mode + r.runFlushDel(task, deltaLogs) + r.WaitGroup.Wait() + + m.dropHandler.Lock() + defer m.dropHandler.Unlock() + m.dropHandler.packs = append(m.dropHandler.packs, r.getFlushPack()) + return + } + } + // normal mode + m.getFlushQueue(segmentID).enqueueDelFlush(task, deltaLogs, pos) +} + // notify flush manager insert buffer data func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { // empty flush if data == nil || data.buffer == nil { - m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{}, - map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos) + //m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{}, + // map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos) + m.handleInsertTask(segmentID, &flushBufferInsertTask{}, map[UniqueID]string{}, map[UniqueID]string{}, + flushed, dropped, pos) return nil } @@ -301,7 +379,12 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni } m.updateSegmentCheckPoint(segmentID) - m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{ + /* + m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{ + BaseKV: m.BaseKV, + data: kvs, + }, field2Insert, field2Stats, flushed, dropped, pos)*/ + m.handleInsertTask(segmentID, &flushBufferInsertTask{ BaseKV: m.BaseKV, data: kvs, }, field2Insert, field2Stats, flushed, dropped, pos) @@ -314,7 +397,10 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique // del signal with empty data if data == nil || data.delData == nil { - m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{}, nil, pos) + /* + m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{}, nil, pos) + */ + m.handleDeleteTask(segmentID, &flushBufferDeleteTask{}, nil, pos) return nil } @@ -342,8 +428,12 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique data.fileSize = int64(len(blob.Value)) data.filePath = blobPath log.Debug("delete blob path", zap.String("path", blobPath)) - - m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{ + /* + m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{ + BaseKV: m.BaseKV, + data: kvs, + }, data, pos)*/ + m.handleDeleteTask(segmentID, &flushBufferDeleteTask{ BaseKV: m.BaseKV, data: kvs, }, data, pos) @@ -381,6 +471,39 @@ func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *interna return collID, partID, meta, nil } +// waitForAllTaskQueue waits for all flush queues in dispatcher become empty +func (m *rendezvousFlushManager) waitForAllFlushQueue() { + var wg sync.WaitGroup + m.dispatcher.Range(func(k, v interface{}) bool { + queue := v.(*orderFlushQueue) + wg.Add(1) + go func() { + <-queue.tailCh + wg.Done() + }() + return true + }) + wg.Wait() +} + +// startDropping changes flush manager into dropping mode +func (m *rendezvousFlushManager) startDropping() { + m.dropping.Store(true) + m.dropHandler.allFlushed = make(chan struct{}) + go func() { + <-m.dropHandler.allFlushed // all needed flush tasks are in flush manager now + m.waitForAllFlushQueue() // waits for all the normal flush queue done + m.dropHandler.dropFlushWg.Wait() // waits for all drop mode task done + m.dropHandler.Lock() + defer m.dropHandler.Unlock() + m.dropHandler.flushAndDrop(m.dropHandler.packs) // invoke drop & flush + }() +} + +func (m *rendezvousFlushManager) notifyAllFlushed() { + close(m.dropHandler.allFlushed) +} + // close cleans up all the left members func (m *rendezvousFlushManager) close() { m.dispatcher.Range(func(k, v interface{}) bool { @@ -422,12 +545,122 @@ func (t *flushBufferDeleteTask) flushDeleteData() error { } // NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv -func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc) *rendezvousFlushManager { - return &rendezvousFlushManager{ +func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager { + fm := &rendezvousFlushManager{ allocatorInterface: allocator, BaseKV: kv, notifyFunc: f, Replica: replica, + dropHandler: dropHandler{ + flushAndDrop: drop, + }, + } + // start with normal mode + fm.dropping.Store(false) + return fm +} + +func getFieldBinlogs(fieldID UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog { + for _, binlog := range binlogs { + if fieldID == binlog.GetFieldID() { + return binlog + } + } + return nil +} + +func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) flushAndDropFunc { + return func(packs []*segmentFlushPack) { + req := &datapb.DropVirtualChannelRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO msg type + MsgID: 0, //TODO msg id + Timestamp: 0, //TODO time stamp + SourceID: Params.NodeID, + }, + ChannelName: dsService.vchannelName, + } + + segmentPack := make(map[UniqueID]*datapb.DropVirtualChannelSegment) + for _, pack := range packs { + segment, has := segmentPack[pack.segmentID] + if !has { + segment = &datapb.DropVirtualChannelSegment{ + SegmentID: pack.segmentID, + CollectionID: dsService.collectionID, + } + + segmentPack[pack.segmentID] = segment + } + for k, v := range pack.insertLogs { + fieldBinlogs := getFieldBinlogs(k, segment.Field2BinlogPaths) + if fieldBinlogs == nil { + segment.Field2BinlogPaths = append(segment.Field2BinlogPaths, &datapb.FieldBinlog{ + FieldID: k, + Binlogs: []string{v}, + }) + } else { + fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, v) + } + } + for k, v := range pack.statsLogs { + fieldStatsLogs := getFieldBinlogs(k, segment.Field2StatslogPaths) + if fieldStatsLogs == nil { + segment.Field2StatslogPaths = append(segment.Field2StatslogPaths, &datapb.FieldBinlog{ + FieldID: k, + Binlogs: []string{v}, + }) + } else { + fieldStatsLogs.Binlogs = append(fieldStatsLogs.Binlogs, v) + } + } + for _, delData := range pack.deltaLogs { + segment.Deltalogs = append(segment.Deltalogs, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogPath: delData.filePath, DeltaLogSize: delData.fileSize}) + } + updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID) + segment.NumOfRows = updates.GetNumRows() + if pack.pos != nil { + if segment.CheckPoint == nil || pack.pos.Timestamp > segment.CheckPoint.Timestamp { + segment.CheckPoint = pack.pos + } + } + } + + // start positions for all new segments + for _, pos := range dsService.replica.listNewSegmentsStartPositions() { + segment, has := segmentPack[pos.GetSegmentID()] + if !has { + segment = &datapb.DropVirtualChannelSegment{ + SegmentID: pos.GetSegmentID(), + CollectionID: dsService.collectionID, + } + + segmentPack[pos.GetSegmentID()] = segment + } + segment.StartPosition = pos.GetStartPosition() + } + + err := retry.Do(context.Background(), func() error { + rsp, err := dsService.dataCoord.DropVirtualChannel(context.Background(), req) + // should be network issue, return error and retry + if err != nil { + return fmt.Errorf(err.Error()) + } + + // TODO should retry only when datacoord status is unhealthy + if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason()) + } + return nil + }, opts...) + if err != nil { + log.Warn("failed to DropVirtualChannel", zap.String("channel", dsService.vchannelName), zap.Error(err)) + panic(err) + } + for segID := range segmentPack { + dsService.replica.segmentFlushed(segID) + dsService.flushingSegCache.Remove(segID) + } } } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index f691c40715..8331fdf1b3 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -22,6 +22,7 @@ import ( "errors" "sync" "testing" + "time" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -144,7 +145,7 @@ func TestRendezvousFlushManager(t *testing.T) { m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { counter.Inc() finish.Done() - }) + }, emptyFlushAndDropFunc) ids := make([][]byte, 0, size) for i := 0; i < size; i++ { @@ -185,7 +186,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { packMut.Unlock() counter.Inc() finish.Done() - }) + }, emptyFlushAndDropFunc) ti := newTaskInjection(1, func(*segmentFlushPack) {}) m.injectFlush(ti, 1) @@ -276,7 +277,7 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) { memkv := memkv.NewMemoryKV() replica := newMockReplica() fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) { - }) + }, emptyFlushAndDropFunc) // non exists segment _, _, _, err := fm.getSegmentMeta(-1, &internalpb.MsgPosition{}) @@ -293,6 +294,128 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) { assert.Error(t, err) } +func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { + kv := memkv.NewMemoryKV() + + size := 1000 + var counter atomic.Int64 + var finish sync.WaitGroup + finish.Add(size) + m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { + counter.Inc() + finish.Done() + }, emptyFlushAndDropFunc) + + ids := make([][]byte, 0, size) + for i := 0; i < size; i++ { + id := make([]byte, 10) + rand.Read(id) + ids = append(ids, id) + } + + for i := 0; i < size; i++ { + m.flushDelData(nil, 1, &internalpb.MsgPosition{ + MsgID: ids[i], + }) + } + + var finished bool + var mut sync.RWMutex + signal := make(chan struct{}) + + go func() { + m.waitForAllFlushQueue() + mut.Lock() + finished = true + mut.Unlock() + close(signal) + }() + + mut.RLock() + assert.False(t, finished) + mut.RUnlock() + + for i := 0; i < size/2; i++ { + m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + MsgID: ids[i], + }) + } + + mut.RLock() + assert.False(t, finished) + mut.RUnlock() + + for i := size / 2; i < size; i++ { + m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + MsgID: ids[i], + }) + } + + timeout := time.NewTimer(time.Second) + select { + case <-timeout.C: + t.FailNow() + case <-signal: + } + + mut.RLock() + assert.True(t, finished) + mut.RUnlock() +} + +func TestRendezvousFlushManager_dropMode(t *testing.T) { + kv := memkv.NewMemoryKV() + + var mut sync.Mutex + var result []*segmentFlushPack + signal := make(chan struct{}) + + m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { + }, func(packs []*segmentFlushPack) { + mut.Lock() + result = packs + mut.Unlock() + close(signal) + }) + + halfMsgID := []byte{1, 1, 1} + m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ + MsgID: halfMsgID, + }) + + m.startDropping() + // half normal, half drop mode, should not appear in final packs + m.flushDelData(nil, -1, &internalpb.MsgPosition{ + MsgID: halfMsgID, + }) + + target := make(map[int64]struct{}) + for i := 1; i < 11; i++ { + target[int64(i)] = struct{}{} + m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + } + + m.notifyAllFlushed() + + <-signal + mut.Lock() + defer mut.Unlock() + + output := make(map[int64]struct{}) + for _, pack := range result { + assert.NotEqual(t, -1, pack.segmentID) + output[pack.segmentID] = struct{}{} + _, has := target[pack.segmentID] + assert.True(t, has) + } + assert.Equal(t, len(target), len(output)) +} + func TestRendezvousFlushManager_close(t *testing.T) { kv := memkv.NewMemoryKV() @@ -303,7 +426,7 @@ func TestRendezvousFlushManager_close(t *testing.T) { m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { counter.Inc() finish.Done() - }) + }, emptyFlushAndDropFunc) ids := make([][]byte, 0, size) for i := 0; i < size; i++ { @@ -331,8 +454,6 @@ func TestRendezvousFlushManager_close(t *testing.T) { } func TestFlushNotifyFunc(t *testing.T) { - // replica := - // rcf := &RootCoordFactory{} ctx := context.Background() rcf := &RootCoordFactory{} @@ -382,3 +503,70 @@ func TestFlushNotifyFunc(t *testing.T) { }) }) } + +func TestDropVirtualChannelFunc(t *testing.T) { + ctx := context.Background() + rcf := &RootCoordFactory{} + replica, err := newReplica(ctx, rcf, 1) + require.NoError(t, err) + + dataCoord := &DataCoordFactory{} + flushingCache := newCache() + dsService := &dataSyncService{ + collectionID: 1, + replica: replica, + dataCoord: dataCoord, + flushingSegCache: flushingCache, + vchannelName: "vchan_01", + } + dropFunc := dropVirtualChannelFunc(dsService, retry.Attempts(1)) + t.Run("normal run", func(t *testing.T) { + replica.addNewSegment(2, 1, 10, "vchan_01", &internalpb.MsgPosition{ + ChannelName: "vchan_01", + MsgID: []byte{1, 2, 3}, + Timestamp: 10, + }, nil) + assert.NotPanics(t, func() { + dropFunc([]*segmentFlushPack{ + { + segmentID: 1, + insertLogs: map[UniqueID]string{1: "/dev/test/id"}, + statsLogs: map[UniqueID]string{1: "/dev/test/id-stats"}, + deltaLogs: []*DelDataBuf{{filePath: "/dev/test/del"}}, + pos: &internalpb.MsgPosition{ + ChannelName: "vchan_01", + MsgID: []byte{1, 2, 3}, + Timestamp: 10, + }, + }, + { + segmentID: 1, + insertLogs: map[UniqueID]string{1: "/dev/test/id_2"}, + statsLogs: map[UniqueID]string{1: "/dev/test/id-stats-2"}, + deltaLogs: []*DelDataBuf{{filePath: "/dev/test/del-2"}}, + pos: &internalpb.MsgPosition{ + ChannelName: "vchan_01", + MsgID: []byte{1, 2, 3}, + Timestamp: 30, + }, + }, + }) + }) + }) + t.Run("datacoord drop fails", func(t *testing.T) { + dataCoord.DropVirtualChannelNotSuccess = true + assert.Panics(t, func() { + dropFunc(nil) + }) + }) + + t.Run("datacoord call error", func(t *testing.T) { + + dataCoord.DropVirtualChannelNotSuccess = false + dataCoord.DropVirtualChannelError = true + assert.Panics(t, func() { + dropFunc(nil) + }) + }) + +} diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 9eaf44c2c2..fdff4b1d6e 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -49,6 +49,8 @@ import ( const ctxTimeInMillisecond = 5000 const debug = false +var emptyFlushAndDropFunc flushAndDropFunc = func(_ []*segmentFlushPack) {} + func newIDLEDataNodeMock(ctx context.Context) *DataNode { msFactory := msgstream.NewPmsFactory() node := NewDataNode(ctx, msFactory) @@ -160,6 +162,9 @@ type DataCoordFactory struct { CompleteCompactionError bool CompleteCompactionNotSuccess bool + + DropVirtualChannelError bool + DropVirtualChannelNotSuccess bool } func (ds *DataCoordFactory) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) { @@ -184,6 +189,24 @@ func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.Sav return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil } +func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { + if ds.DropVirtualChannelError { + return nil, errors.New("error") + } + if ds.DropVirtualChannelNotSuccess { + return &datapb.DropVirtualChannelResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, nil + } + return &datapb.DropVirtualChannelResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil +} + func (mf *MetaFactory) GetCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { sch := schemapb.CollectionSchema{ Name: collectionName,