From 59d19384dc96b77d071eecfe0a1c20ff57fa0ffd Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 24 Nov 2021 18:41:16 +0800 Subject: [PATCH] Fix flush manager injection logic with multiple segments (#12260) Signed-off-by: Congqi Xia --- internal/datanode/compactor.go | 14 ++---- internal/datanode/compactor_test.go | 5 +- internal/datanode/flush_manager.go | 20 +++++--- internal/datanode/flush_manager_test.go | 63 +++++++++++++++---------- internal/datanode/flush_task.go | 42 +++++++++++++++-- internal/datanode/flush_task_test.go | 23 ++++----- 6 files changed, 107 insertions(+), 60 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 8189b569da..b179e37917 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -296,17 +296,13 @@ func (t *compactionTask) compact() error { } // Inject to stop flush - ti := taskInjection{ - injected: make(chan struct{}), - injectOver: make(chan bool), - postInjection: func(pack *segmentFlushPack) { - pack.segmentID = targetSegID - }, - } + ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) { + pack.segmentID = targetSegID + }) defer close(ti.injectOver) t.injectFlush(ti, segIDs...) - <-ti.injected + <-ti.Injected() var ( iItr = make([]iterator, 0) @@ -449,7 +445,7 @@ func (t *compactionTask) compact() error { } } - ti.injectOver <- true + ti.injectDone(true) log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("num of binlog paths", len(cpaths.inPaths)), zap.Any("num of stats paths", len(cpaths.statsPaths)), diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 1e40596c00..129668456a 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -552,10 +552,11 @@ func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, return nil } -func (mfm *mockFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) { +func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { go func() { time.Sleep(time.Second * time.Duration(mfm.sleepSeconds)) - injection.injected <- struct{}{} + //injection.injected <- struct{}{} + close(injection.injected) <-injection.injectOver }() } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 42ee4b5bb2..7d97485d90 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -41,7 +41,7 @@ type flushManager interface { // notify flush manager del buffer data flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error // injectFlush injects compaction or other blocking task before flush sync - injectFlush(injection taskInjection, segments ...UniqueID) + injectFlush(injection *taskInjection, segments ...UniqueID) // close handles resource clean up close() } @@ -73,7 +73,7 @@ var _ flushManager = (*rendezvousFlushManager)(nil) type orderFlushQueue struct { sync.Once segmentID UniqueID - injectCh chan taskInjection + injectCh chan *taskInjection // MsgID => flushTask working sync.Map @@ -93,7 +93,7 @@ func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue { q := &orderFlushQueue{ segmentID: segID, notifyFunc: f, - injectCh: make(chan taskInjection, 100), + injectCh: make(chan *taskInjection, 100), } return q } @@ -159,7 +159,7 @@ func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDa // inject performs injection for current task queue // send into injectCh in there is running task // or perform injection logic here if there is no injection -func (q *orderFlushQueue) inject(inject taskInjection) { +func (q *orderFlushQueue) inject(inject *taskInjection) { q.injectCh <- inject } @@ -187,8 +187,13 @@ func (h *injectHandler) handleInjection(q *orderFlushQueue) { injectDone := make(chan struct{}) q.tailCh = injectDone q.tailMut.Unlock() - inject.injected <- struct{}{} - <-inject.injectOver + // notify one injection done + inject.injectOne() + ok := <-inject.injectOver + // apply injection + if ok { + q.postInjection = inject.postInjection + } close(injectDone) case <-h.done: return @@ -346,7 +351,8 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique } // injectFlush inject process before task finishes -func (m *rendezvousFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) { +func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { + go injection.waitForInjected() for _, segmentID := range segments { m.getFlushQueue(segmentID).inject(injection) } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index a8629a5560..f691c40715 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -177,23 +177,20 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { var counter atomic.Int64 finish := sync.WaitGroup{} finish.Add(size) - packs := make([]*segmentFlushPack, 0, size+1) + var packMut sync.Mutex + packs := make([]*segmentFlushPack, 0, size+3) m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { + packMut.Lock() packs = append(packs, pack) + packMut.Unlock() counter.Inc() finish.Done() }) - injected := make(chan struct{}) - injectOver := make(chan bool) - m.injectFlush(taskInjection{ - injected: injected, - injectOver: injectOver, - postInjection: func(*segmentFlushPack) { - }, - }, 1) - <-injected - injectOver <- true + ti := newTaskInjection(1, func(*segmentFlushPack) {}) + m.injectFlush(ti, 1) + <-ti.injected + ti.injectDone(true) ids := make([][]byte, 0, size) for i := 0; i < size; i++ { @@ -218,44 +215,60 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { assert.EqualValues(t, size, counter.Load()) - finish.Add(1) + finish.Add(2) id := make([]byte, 10) rand.Read(id) + id2 := make([]byte, 10) + rand.Read(id2) + rand.Read(id) m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{ MsgID: id, }) + m.flushBufferData(nil, 3, true, false, &internalpb.MsgPosition{ + MsgID: id2, + }) - m.injectFlush(taskInjection{ - injected: injected, - injectOver: injectOver, - postInjection: func(pack *segmentFlushPack) { - pack.segmentID = 3 - }, - }, 2) + ti = newTaskInjection(2, func(pack *segmentFlushPack) { + pack.segmentID = 4 + }) + m.injectFlush(ti, 2, 3) go func() { - <-injected - injectOver <- true + <-ti.injected + ti.injectDone(true) }() m.flushDelData(nil, 2, &internalpb.MsgPosition{ MsgID: id, }) + m.flushDelData(nil, 3, &internalpb.MsgPosition{ + MsgID: id2, + }) finish.Wait() - assert.EqualValues(t, size+1, counter.Load()) - assert.EqualValues(t, 3, packs[size].segmentID) + assert.EqualValues(t, size+2, counter.Load()) + assert.EqualValues(t, 4, packs[size].segmentID) finish.Add(1) rand.Read(id) + m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{ MsgID: id, }) + ti = newTaskInjection(1, func(pack *segmentFlushPack) { + pack.segmentID = 5 + }) + go func() { + <-ti.injected + ti.injectDone(false) // inject fail, segment id shall not be changed to 5 + }() + m.injectFlush(ti, 2) + m.flushDelData(nil, 2, &internalpb.MsgPosition{ MsgID: id, }) finish.Wait() - assert.EqualValues(t, size+2, counter.Load()) - assert.EqualValues(t, 3, packs[size+1].segmentID) + assert.EqualValues(t, size+3, counter.Load()) + assert.EqualValues(t, 4, packs[size+1].segmentID) } diff --git a/internal/datanode/flush_task.go b/internal/datanode/flush_task.go index 0245549414..b347c3d1df 100644 --- a/internal/datanode/flush_task.go +++ b/internal/datanode/flush_task.go @@ -54,7 +54,7 @@ type flushTaskRunner struct { startSignal <-chan struct{} finishSignal chan struct{} - injectSignal <-chan taskInjection + injectSignal <-chan *taskInjection segmentID UniqueID insertLogs map[UniqueID]string @@ -71,9 +71,45 @@ type flushTaskRunner struct { type taskInjection struct { injected chan struct{} // channel to notify injected injectOver chan bool // indicates injection over + wg sync.WaitGroup postInjection func(pack *segmentFlushPack) } +func newTaskInjection(segmentCnt int, pf func(pack *segmentFlushPack)) *taskInjection { + ti := &taskInjection{ + injected: make(chan struct{}), + injectOver: make(chan bool, segmentCnt), + postInjection: pf, + } + ti.wg.Add(segmentCnt) + return ti +} + +// Injected returns a chan, which will be closed after pre set segments counts a injected +func (ti *taskInjection) Injected() <-chan struct{} { + return ti.injected +} + +func (ti *taskInjection) waitForInjected() { + ti.wg.Wait() + close(ti.injected) +} + +func (ti *taskInjection) injectOne() { + ti.wg.Done() +} + +func (ti *taskInjection) injectDone(success bool) { + if !success { + close(ti.injectOver) + return + } + + for i := 0; i < cap(ti.injectOver); i++ { + ti.injectOver <- true + } +} + // init initializes flushTaskRunner with provided actions and signal func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <-chan struct{}) { t.initOnce.Do(func() { @@ -136,7 +172,7 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos select { case injection := <-t.injectSignal: // notify injected - injection.injected <- struct{}{} + injection.injectOne() ok := <-injection.injectOver if ok { // apply postInjection func @@ -172,7 +208,7 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack { } // newFlushTaskRunner create a usable task runner -func newFlushTaskRunner(segmentID UniqueID, injectCh <-chan taskInjection) *flushTaskRunner { +func newFlushTaskRunner(segmentID UniqueID, injectCh <-chan *taskInjection) *flushTaskRunner { t := &flushTaskRunner{ WaitGroup: sync.WaitGroup{}, segmentID: segmentID, diff --git a/internal/datanode/flush_task_test.go b/internal/datanode/flush_task_test.go index ddf431c2b5..8b9552e02e 100644 --- a/internal/datanode/flush_task_test.go +++ b/internal/datanode/flush_task_test.go @@ -95,7 +95,7 @@ func TestFlushTaskRunner_FailError(t *testing.T) { } func TestFlushTaskRunner_Injection(t *testing.T) { - injectCh := make(chan taskInjection, 1) + injectCh := make(chan *taskInjection, 1) task := newFlushTaskRunner(1, injectCh) signal := make(chan struct{}) @@ -103,21 +103,16 @@ func TestFlushTaskRunner_Injection(t *testing.T) { nextFlag := false processed := make(chan struct{}) - injected := make(chan struct{}) - injectOver := make(chan bool) - - injectCh <- taskInjection{ - injected: injected, - injectOver: injectOver, - postInjection: func(pack *segmentFlushPack) { - t.Log("task injection executed") - pack.segmentID = 2 - }, - } + ti := newTaskInjection(1, func(pack *segmentFlushPack) { + t.Log("task injection executed") + pack.segmentID = 2 + }) + go ti.waitForInjected() + injectCh <- ti go func() { - <-injected - injectOver <- true + <-ti.injected + ti.injectDone(true) }() task.init(func(pack *segmentFlushPack) {