diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index faad0540d4..b2dbb7955d 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -84,7 +84,7 @@ type notifyMetaFunc func(*segmentFlushPack) type flushAndDropFunc func([]*segmentFlushPack) // taskPostFunc clean up function after single flush task done -type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc) +type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc, isFlush bool) // postInjectionFunc post injection pack process logic type postInjectionFunc func(pack *segmentFlushPack) @@ -96,18 +96,20 @@ var _ flushManager = (*rendezvousFlushManager)(nil) type orderFlushQueue struct { sync.Once segmentID UniqueID - injectCh chan *taskInjection // MsgID => flushTask working *typeutil.ConcurrentMap[string, *flushTaskRunner] notifyFunc notifyMetaFunc - tailMut sync.Mutex - tailCh chan struct{} - + // protect postInjection injectMut sync.Mutex - runningTasks int32 + injectCh chan *taskInjection postInjection postInjectionFunc + + // protect task create and close + taskMut sync.Mutex + runningTasks int32 + tailCh chan struct{} } // newOrderFlushQueue creates an orderFlushQueue @@ -130,19 +132,16 @@ func (q *orderFlushQueue) init() { }) } -func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskRunner { +func (q *orderFlushQueue) getFlushTask(pos *msgpb.MsgPosition) *flushTaskRunner { t, loaded := q.working.GetOrInsert(getSyncTaskID(pos), newFlushTaskRunner(q.segmentID, q.injectCh)) // not loaded means the task runner is new, do initializtion if !loaded { - // take over injection if task queue is handling it - q.injectMut.Lock() + q.taskMut.Lock() + defer q.taskMut.Unlock() + q.runningTasks++ - q.injectMut.Unlock() - // add task to tail - q.tailMut.Lock() t.init(q.notifyFunc, q.postTask, q.tailCh) q.tailCh = t.finishSignal - q.tailMut.Unlock() log.Info("new flush task runner created and initialized", zap.Int64("segmentID", q.segmentID), zap.String("pos message ID", string(pos.GetMsgID())), @@ -152,111 +151,108 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR } // postTask handles clean up work after a task is done -func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) { - // delete task from working map - q.working.GetAndRemove(getSyncTaskID(pack.pos)) - // after descreasing working count, check whether flush queue is empty +func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc, isFlush bool) { + // delete flush task from working map + if isFlush { + q.working.GetAndRemove(getSyncTaskID(pack.pos)) + } + q.injectMut.Lock() - q.runningTasks-- // set postInjection function if injection is handled in task if postInjection != nil { q.postInjection = postInjection } - if q.postInjection != nil { + if isFlush && q.postInjection != nil { q.postInjection(pack) } + q.injectMut.Unlock() - // if flush queue is empty, drain all injection from injectCh - if q.runningTasks == 0 { - for i := 0; i < len(q.injectCh); i++ { - inject := <-q.injectCh - go q.handleInject(inject) - } + // after descreasing working count, check whether flush queue is empty + q.taskMut.Lock() + q.runningTasks-- + if q.runningTasks == 0 && len(q.injectCh) > 0 { + q.enqueueInjectTask() } - q.injectMut.Unlock() + q.taskMut.Unlock() } // enqueueInsertBuffer put insert buffer data into queue func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]*datapb.Binlog, flushed bool, dropped bool, pos *msgpb.MsgPosition) { - q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos) + q.getFlushTask(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos) } // enqueueDelBuffer put delete buffer data into queue func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDataBuf, pos *msgpb.MsgPosition) { - q.getFlushTaskRunner(pos).runFlushDel(task, deltaLogs) + q.getFlushTask(pos).runFlushDel(task, deltaLogs) +} + +// create inject task to drain all injection from injectCh +// WARN: RUN WITH TASK MUT +func (q *orderFlushQueue) enqueueInjectTask() { + q.runningTasks++ + t := newInjectTask(q.postTask, q.injectCh, q.tailCh) + q.tailCh = t.finishSignal + + log.Info("new injection task enqueue", + zap.Int64("segmentID", q.segmentID), + ) } // inject performs injection for current task queue // send into injectCh in there is running task // or perform injection logic here if there is no injection func (q *orderFlushQueue) inject(inject *taskInjection) { - q.injectMut.Lock() - defer q.injectMut.Unlock() - // check if there are running task(s) - // if true, just put injection into injectCh - // in case of task misses an injection, the injectCh shall be drained in `postTask` - if q.runningTasks > 0 { - q.injectCh <- inject - return - } - // otherwise just handle injection here - go q.handleInject(inject) -} + q.injectCh <- inject -func (q *orderFlushQueue) handleInject(inject *taskInjection) { - // notify one injection done - inject.injectOne() - ok := <-inject.injectOver - // apply injection - if ok { - q.injectMut.Lock() - defer q.injectMut.Unlock() - q.postInjection = inject.postInjection + q.taskMut.Lock() + defer q.taskMut.Unlock() + + if q.runningTasks == 0 { + q.enqueueInjectTask() } } -/* -// injectionHandler handles injection for empty flush queue -type injectHandler struct { - once sync.Once - wg sync.WaitGroup - done chan struct{} +// injectTask handles injection for empty flush queue +type injectTask struct { + startSignal, finishSignal chan struct{} + injectCh <-chan *taskInjection + postFunc taskPostFunc } -// newInjectHandler create injection handler for flush queue -func newInjectHandler(q *orderFlushQueue) *injectHandler { - h := &injectHandler{ - done: make(chan struct{}), +// newInjectTask create injection task to flush queue +func newInjectTask(postFunc taskPostFunc, injectCh chan *taskInjection, startSignal chan struct{}) *injectTask { + h := &injectTask{ + postFunc: postFunc, + injectCh: injectCh, + startSignal: startSignal, + finishSignal: make(chan struct{}), } - h.wg.Add(1) - go h.handleInjection(q) + + go h.waitFinish() return h } -func (h *injectHandler) handleInjection(q *orderFlushQueue) { - defer h.wg.Done() - for { - select { - case inject := <-q.injectCh: - q.tailMut.Lock() //Maybe double check - injectDone := make(chan struct{}) - q.tailCh = injectDone - q.tailMut.Unlock() - case <-h.done: - return +func (h *injectTask) waitFinish() { + <-h.startSignal + var postInjection postInjectionFunc + select { + case injection := <-h.injectCh: + // notify injected + injection.injectOne() + ok := <-injection.injectOver + if ok { + // apply postInjection func + postInjection = injection.postInjection } + default: } -} -func (h *injectHandler) close() { - h.once.Do(func() { - close(h.done) - h.wg.Wait() - }) + h.postFunc(nil, postInjection, false) + // notify next task + close(h.finishSignal) } -*/ type dropHandler struct { sync.Mutex @@ -650,11 +646,9 @@ func getSyncTaskID(pos *msgpb.MsgPosition) string { func (m *rendezvousFlushManager) close() { m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool { // assertion ok - queue.injectMut.Lock() - for i := 0; i < len(queue.injectCh); i++ { - go queue.handleInject(<-queue.injectCh) - } - queue.injectMut.Unlock() + queue.taskMut.Lock() + queue.enqueueInjectTask() + queue.taskMut.Unlock() return true }) m.waitForAllFlushQueue() diff --git a/internal/datanode/flush_task.go b/internal/datanode/flush_task.go index 2e728763e5..4579931eea 100644 --- a/internal/datanode/flush_task.go +++ b/internal/datanode/flush_task.go @@ -200,7 +200,7 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos } default: } - postFunc(pack, postInjection) + postFunc(pack, postInjection, true) // execution done, dequeue and make count -- notifyFunc(pack) diff --git a/internal/datanode/flush_task_test.go b/internal/datanode/flush_task_test.go index 0bdcc06895..15fc5fbad2 100644 --- a/internal/datanode/flush_task_test.go +++ b/internal/datanode/flush_task_test.go @@ -34,7 +34,7 @@ func TestFlushTaskRunner(t *testing.T) { task.init(func(*segmentFlushPack) { saveFlag = true - }, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal) + }, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) {}, signal) go func() { <-task.finishSignal @@ -70,7 +70,7 @@ func TestFlushTaskRunner_FailError(t *testing.T) { if pack.err != nil { errFlag = true } - }, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal) + }, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) {}, signal) go func() { <-task.finishSignal @@ -118,7 +118,7 @@ func TestFlushTaskRunner_Injection(t *testing.T) { task.init(func(pack *segmentFlushPack) { assert.EqualValues(t, 2, pack.segmentID) saveFlag = true - }, func(pack *segmentFlushPack, i postInjectionFunc) { + }, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) { if i != nil { i(pack) }