fix: [2.6] Use eventually & fix task id appear in both executing&completed (#44698) (#44715)

Cherry-pick from master
pr: #44698
Related to #44620

This PR:
- Use eventually instead of `time.Sleep` in accesslog writer unit test
- Make sure compaction task results have only one state from executor
API

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-10-10 14:29:57 +08:00 committed by GitHub
parent 615544ee71
commit a85b06d965
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 16 deletions

View File

@ -240,11 +240,12 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult {
completed []int64
completedLevelZero []int64
)
var executingResults []*datapb.CompactionPlanResult
results := make([]*datapb.CompactionPlanResult, 0)
// get executing results
e.executing.Range(func(planID int64, task Compactor) bool {
executing = append(executing, planID)
results = append(results, &datapb.CompactionPlanResult{
executingResults = append(executingResults, &datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
PlanID: planID,
})
@ -262,6 +263,13 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult {
return true
})
// quick fix for task id may appear in both executing and completed
// TODO: make sure task id only has one state
completedIDs := typeutil.NewSet(completed...)
results = append(results, lo.Filter(executingResults, func(result *datapb.CompactionPlanResult, _ int) bool {
return !completedIDs.Contain(result.GetPlanID())
})...)
// remove level zero results
lo.ForEach(completedLevelZero, func(planID int64, _ int) {
e.completed.Remove(planID)

View File

@ -66,10 +66,10 @@ func TestRotateWriter_Basic(t *testing.T) {
err = logger.Rotate()
assert.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second)
logfiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.Equal(t, 1, len(logfiles))
assert.Eventually(t, func() bool {
logfiles, err := logger.handler.listAll()
return err == nil && len(logfiles) == 1
}, time.Second*5, time.Millisecond*200)
}
func TestRotateWriter_TimeRotate(t *testing.T) {
@ -96,10 +96,10 @@ func TestRotateWriter_TimeRotate(t *testing.T) {
assert.Equal(t, num, n)
assert.NoError(t, err)
time.Sleep(time.Duration(4) * time.Second)
logfiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.GreaterOrEqual(t, len(logfiles), 1)
assert.Eventually(t, func() bool {
logfiles, err := logger.handler.listAll()
return err == nil && len(logfiles) >= 1
}, time.Second*5, time.Millisecond*200)
}
func TestRotateWriter_SizeRotate(t *testing.T) {
@ -135,9 +135,10 @@ func TestRotateWriter_SizeRotate(t *testing.T) {
// assert minio files
time.Sleep(time.Duration(1) * time.Second)
remoteFiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.Equal(t, fileNum, len(remoteFiles))
assert.Eventually(t, func() bool {
remoteFiles, err := logger.handler.listAll()
return err == nil && len(remoteFiles) == fileNum
}, time.Second*5, time.Microsecond*200)
// assert local sealed files num
localFields, err := logger.oldLogFiles()
@ -169,11 +170,11 @@ func TestRotateWriter_LocalRetention(t *testing.T) {
logger.Rotate()
logger.Write([]byte("Test"))
logger.Rotate()
time.Sleep(time.Duration(1) * time.Second)
logFiles, err := logger.oldLogFiles()
assert.NoError(t, err)
assert.Equal(t, 1, len(logFiles))
assert.Eventually(t, func() bool {
logFiles, err := logger.oldLogFiles()
return err == nil && len(logFiles) == 1
}, time.Second*5, time.Millisecond*200)
}
func TestRotateWriter_BasicError(t *testing.T) {