fix: fix injection invalid bug by add inject task to handler inject when queue was empty (#31819)

relate: https://github.com/milvus-io/milvus/issues/31548

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2024-04-03 14:05:14 +08:00 committed by GitHub
parent a992334d0e
commit cf02c623ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 88 deletions

View File

@ -84,7 +84,7 @@ type notifyMetaFunc func(*segmentFlushPack)
type flushAndDropFunc func([]*segmentFlushPack)
// taskPostFunc clean up function after single flush task done
type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc)
type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc, isFlush bool)
// postInjectionFunc post injection pack process logic
type postInjectionFunc func(pack *segmentFlushPack)
@ -96,18 +96,20 @@ var _ flushManager = (*rendezvousFlushManager)(nil)
type orderFlushQueue struct {
sync.Once
segmentID UniqueID
injectCh chan *taskInjection
// MsgID => flushTask
working *typeutil.ConcurrentMap[string, *flushTaskRunner]
notifyFunc notifyMetaFunc
tailMut sync.Mutex
tailCh chan struct{}
// protect postInjection
injectMut sync.Mutex
runningTasks int32
injectCh chan *taskInjection
postInjection postInjectionFunc
// protect task create and close
taskMut sync.Mutex
runningTasks int32
tailCh chan struct{}
}
// newOrderFlushQueue creates an orderFlushQueue
@ -130,19 +132,16 @@ func (q *orderFlushQueue) init() {
})
}
func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskRunner {
func (q *orderFlushQueue) getFlushTask(pos *msgpb.MsgPosition) *flushTaskRunner {
t, loaded := q.working.GetOrInsert(getSyncTaskID(pos), newFlushTaskRunner(q.segmentID, q.injectCh))
// not loaded means the task runner is new, do initializtion
if !loaded {
// take over injection if task queue is handling it
q.injectMut.Lock()
q.taskMut.Lock()
defer q.taskMut.Unlock()
q.runningTasks++
q.injectMut.Unlock()
// add task to tail
q.tailMut.Lock()
t.init(q.notifyFunc, q.postTask, q.tailCh)
q.tailCh = t.finishSignal
q.tailMut.Unlock()
log.Info("new flush task runner created and initialized",
zap.Int64("segmentID", q.segmentID),
zap.String("pos message ID", string(pos.GetMsgID())),
@ -152,111 +151,108 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR
}
// postTask handles clean up work after a task is done
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) {
// delete task from working map
q.working.GetAndRemove(getSyncTaskID(pack.pos))
// after descreasing working count, check whether flush queue is empty
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc, isFlush bool) {
// delete flush task from working map
if isFlush {
q.working.GetAndRemove(getSyncTaskID(pack.pos))
}
q.injectMut.Lock()
q.runningTasks--
// set postInjection function if injection is handled in task
if postInjection != nil {
q.postInjection = postInjection
}
if q.postInjection != nil {
if isFlush && q.postInjection != nil {
q.postInjection(pack)
}
q.injectMut.Unlock()
// if flush queue is empty, drain all injection from injectCh
if q.runningTasks == 0 {
for i := 0; i < len(q.injectCh); i++ {
inject := <-q.injectCh
go q.handleInject(inject)
}
// after descreasing working count, check whether flush queue is empty
q.taskMut.Lock()
q.runningTasks--
if q.runningTasks == 0 && len(q.injectCh) > 0 {
q.enqueueInjectTask()
}
q.injectMut.Unlock()
q.taskMut.Unlock()
}
// enqueueInsertBuffer put insert buffer data into queue
func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]*datapb.Binlog, flushed bool, dropped bool, pos *msgpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos)
q.getFlushTask(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos)
}
// enqueueDelBuffer put delete buffer data into queue
func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDataBuf, pos *msgpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushDel(task, deltaLogs)
q.getFlushTask(pos).runFlushDel(task, deltaLogs)
}
// create inject task to drain all injection from injectCh
// WARN: RUN WITH TASK MUT
func (q *orderFlushQueue) enqueueInjectTask() {
q.runningTasks++
t := newInjectTask(q.postTask, q.injectCh, q.tailCh)
q.tailCh = t.finishSignal
log.Info("new injection task enqueue",
zap.Int64("segmentID", q.segmentID),
)
}
// inject performs injection for current task queue
// send into injectCh in there is running task
// or perform injection logic here if there is no injection
func (q *orderFlushQueue) inject(inject *taskInjection) {
q.injectMut.Lock()
defer q.injectMut.Unlock()
// check if there are running task(s)
// if true, just put injection into injectCh
// in case of task misses an injection, the injectCh shall be drained in `postTask`
if q.runningTasks > 0 {
q.injectCh <- inject
return
}
// otherwise just handle injection here
go q.handleInject(inject)
}
q.injectCh <- inject
func (q *orderFlushQueue) handleInject(inject *taskInjection) {
// notify one injection done
inject.injectOne()
ok := <-inject.injectOver
// apply injection
if ok {
q.injectMut.Lock()
defer q.injectMut.Unlock()
q.postInjection = inject.postInjection
q.taskMut.Lock()
defer q.taskMut.Unlock()
if q.runningTasks == 0 {
q.enqueueInjectTask()
}
}
/*
// injectionHandler handles injection for empty flush queue
type injectHandler struct {
once sync.Once
wg sync.WaitGroup
done chan struct{}
// injectTask handles injection for empty flush queue
type injectTask struct {
startSignal, finishSignal chan struct{}
injectCh <-chan *taskInjection
postFunc taskPostFunc
}
// newInjectHandler create injection handler for flush queue
func newInjectHandler(q *orderFlushQueue) *injectHandler {
h := &injectHandler{
done: make(chan struct{}),
// newInjectTask create injection task to flush queue
func newInjectTask(postFunc taskPostFunc, injectCh chan *taskInjection, startSignal chan struct{}) *injectTask {
h := &injectTask{
postFunc: postFunc,
injectCh: injectCh,
startSignal: startSignal,
finishSignal: make(chan struct{}),
}
h.wg.Add(1)
go h.handleInjection(q)
go h.waitFinish()
return h
}
func (h *injectHandler) handleInjection(q *orderFlushQueue) {
defer h.wg.Done()
for {
select {
case inject := <-q.injectCh:
q.tailMut.Lock() //Maybe double check
injectDone := make(chan struct{})
q.tailCh = injectDone
q.tailMut.Unlock()
case <-h.done:
return
func (h *injectTask) waitFinish() {
<-h.startSignal
var postInjection postInjectionFunc
select {
case injection := <-h.injectCh:
// notify injected
injection.injectOne()
ok := <-injection.injectOver
if ok {
// apply postInjection func
postInjection = injection.postInjection
}
default:
}
}
func (h *injectHandler) close() {
h.once.Do(func() {
close(h.done)
h.wg.Wait()
})
h.postFunc(nil, postInjection, false)
// notify next task
close(h.finishSignal)
}
*/
type dropHandler struct {
sync.Mutex
@ -650,11 +646,9 @@ func getSyncTaskID(pos *msgpb.MsgPosition) string {
func (m *rendezvousFlushManager) close() {
m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool {
// assertion ok
queue.injectMut.Lock()
for i := 0; i < len(queue.injectCh); i++ {
go queue.handleInject(<-queue.injectCh)
}
queue.injectMut.Unlock()
queue.taskMut.Lock()
queue.enqueueInjectTask()
queue.taskMut.Unlock()
return true
})
m.waitForAllFlushQueue()

View File

@ -200,7 +200,7 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos
}
default:
}
postFunc(pack, postInjection)
postFunc(pack, postInjection, true)
// execution done, dequeue and make count --
notifyFunc(pack)

View File

@ -34,7 +34,7 @@ func TestFlushTaskRunner(t *testing.T) {
task.init(func(*segmentFlushPack) {
saveFlag = true
}, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal)
}, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) {}, signal)
go func() {
<-task.finishSignal
@ -70,7 +70,7 @@ func TestFlushTaskRunner_FailError(t *testing.T) {
if pack.err != nil {
errFlag = true
}
}, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal)
}, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) {}, signal)
go func() {
<-task.finishSignal
@ -118,7 +118,7 @@ func TestFlushTaskRunner_Injection(t *testing.T) {
task.init(func(pack *segmentFlushPack) {
assert.EqualValues(t, 2, pack.segmentID)
saveFlag = true
}, func(pack *segmentFlushPack, i postInjectionFunc) {
}, func(pack *segmentFlushPack, i postInjectionFunc, isFlush bool) {
if i != nil {
i(pack)
}