mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix Flush hang after SyncSegments timeout (#24692)
Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
072cbf0353
commit
9e7ef944f7
@ -59,11 +59,11 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
|
||||
c.executing.Delete(task.getPlanID())
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID) {
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID, success bool) {
|
||||
c.completed.Delete(planID)
|
||||
task, loaded := c.completedCompactor.LoadAndDelete(planID)
|
||||
if loaded {
|
||||
task.(compactor).injectDone()
|
||||
task.(compactor).injectDone(success)
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string
|
||||
// remove all completed plans for vChannelName
|
||||
c.completed.Range(func(key interface{}, value interface{}) bool {
|
||||
if value.(*datapb.CompactionResult).GetChannel() == vChannelName {
|
||||
c.injectDone(key.(UniqueID))
|
||||
c.injectDone(key.(UniqueID), true)
|
||||
log.Info("remove compaction results for dropped channel",
|
||||
zap.String("channel", vChannelName),
|
||||
zap.Int64("planID", key.(UniqueID)))
|
||||
|
||||
@ -20,8 +20,9 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
func TestCompactionExecutor(t *testing.T) {
|
||||
@ -41,6 +42,20 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
ex.stopTask(UniqueID(1))
|
||||
})
|
||||
|
||||
t.Run("Test injectDone", func(t *testing.T) {
|
||||
ex := newCompactionExecutor()
|
||||
mc := newMockCompactor(true)
|
||||
ex.completed.Store(UniqueID(1), &datapb.CompactionResult{})
|
||||
ex.completedCompactor.Store(UniqueID(1), mc)
|
||||
|
||||
ex.injectDone(UniqueID(1), false)
|
||||
|
||||
_, ok := ex.completed.Load(UniqueID(1))
|
||||
assert.False(t, ok)
|
||||
_, ok = ex.completedCompactor.Load(UniqueID(1))
|
||||
assert.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("Test start", func(t *testing.T) {
|
||||
ex := newCompactionExecutor()
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
@ -153,7 +168,7 @@ func (mc *mockCompactor) compact() (*datapb.CompactionResult, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) injectDone() {
|
||||
func (mc *mockCompactor) injectDone(success bool) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -57,7 +57,7 @@ type iterator = storage.Iterator
|
||||
type compactor interface {
|
||||
complete()
|
||||
compact() (*datapb.CompactionResult, error)
|
||||
injectDone()
|
||||
injectDone(success bool)
|
||||
stop()
|
||||
getPlanID() UniqueID
|
||||
getCollection() UniqueID
|
||||
@ -123,7 +123,7 @@ func (t *compactionTask) complete() {
|
||||
func (t *compactionTask) stop() {
|
||||
t.cancel()
|
||||
<-t.done
|
||||
t.injectDone()
|
||||
t.injectDone(true)
|
||||
}
|
||||
|
||||
func (t *compactionTask) getPlanID() UniqueID {
|
||||
@ -689,10 +689,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
return pack, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) injectDone() {
|
||||
func (t *compactionTask) injectDone(success bool) {
|
||||
if t.inject != nil {
|
||||
uninjectStart := time.Now()
|
||||
t.inject.injectDone(true)
|
||||
t.inject.injectDone(success)
|
||||
uninjectEnd := time.Now()
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}
|
||||
|
||||
@ -767,7 +767,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
task.injectDone(true)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
}
|
||||
@ -855,7 +855,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
task.injectDone(true)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
})
|
||||
|
||||
@ -954,9 +954,10 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
if err = channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
log.Ctx(ctx).Warn("fail to merge flushed segments", zap.Error(err))
|
||||
status.Reason = err.Error()
|
||||
node.compactionExecutor.injectDone(req.GetPlanID(), false)
|
||||
return status, nil
|
||||
}
|
||||
node.compactionExecutor.injectDone(req.GetPlanID())
|
||||
node.compactionExecutor.injectDone(req.GetPlanID(), true)
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user