From a85b06d96555e7ede52db79730b0d832e869ca42 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 10 Oct 2025 14:29:57 +0800 Subject: [PATCH] fix: [2.6] Use eventually & fix task id appear in both executing&completed (#44698) (#44715) Cherry-pick from master pr: #44698 Related to #44620 This PR: - Use eventually instead of `time.Sleep` in accesslog writer unit test - Make sure compaction task results have only one state from executor API Signed-off-by: Congqi Xia --- internal/datanode/compactor/executor.go | 10 +++++++- internal/proxy/accesslog/writer_test.go | 31 +++++++++++++------------ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/internal/datanode/compactor/executor.go b/internal/datanode/compactor/executor.go index 14b41f7bf7..830fe71799 100644 --- a/internal/datanode/compactor/executor.go +++ b/internal/datanode/compactor/executor.go @@ -240,11 +240,12 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult { completed []int64 completedLevelZero []int64 ) + var executingResults []*datapb.CompactionPlanResult results := make([]*datapb.CompactionPlanResult, 0) // get executing results e.executing.Range(func(planID int64, task Compactor) bool { executing = append(executing, planID) - results = append(results, &datapb.CompactionPlanResult{ + executingResults = append(executingResults, &datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_executing, PlanID: planID, }) @@ -262,6 +263,13 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult { return true }) + // quick fix for task id may appear in both executing and completed + // TODO: make sure task id only has one state + completedIDs := typeutil.NewSet(completed...) + results = append(results, lo.Filter(executingResults, func(result *datapb.CompactionPlanResult, _ int) bool { + return !completedIDs.Contain(result.GetPlanID()) + })...) + // remove level zero results lo.ForEach(completedLevelZero, func(planID int64, _ int) { e.completed.Remove(planID) diff --git a/internal/proxy/accesslog/writer_test.go b/internal/proxy/accesslog/writer_test.go index 534852e685..1a58266238 100644 --- a/internal/proxy/accesslog/writer_test.go +++ b/internal/proxy/accesslog/writer_test.go @@ -66,10 +66,10 @@ func TestRotateWriter_Basic(t *testing.T) { err = logger.Rotate() assert.NoError(t, err) - time.Sleep(time.Duration(1) * time.Second) - logfiles, err := logger.handler.listAll() - assert.NoError(t, err) - assert.Equal(t, 1, len(logfiles)) + assert.Eventually(t, func() bool { + logfiles, err := logger.handler.listAll() + return err == nil && len(logfiles) == 1 + }, time.Second*5, time.Millisecond*200) } func TestRotateWriter_TimeRotate(t *testing.T) { @@ -96,10 +96,10 @@ func TestRotateWriter_TimeRotate(t *testing.T) { assert.Equal(t, num, n) assert.NoError(t, err) - time.Sleep(time.Duration(4) * time.Second) - logfiles, err := logger.handler.listAll() - assert.NoError(t, err) - assert.GreaterOrEqual(t, len(logfiles), 1) + assert.Eventually(t, func() bool { + logfiles, err := logger.handler.listAll() + return err == nil && len(logfiles) >= 1 + }, time.Second*5, time.Millisecond*200) } func TestRotateWriter_SizeRotate(t *testing.T) { @@ -135,9 +135,10 @@ func TestRotateWriter_SizeRotate(t *testing.T) { // assert minio files time.Sleep(time.Duration(1) * time.Second) - remoteFiles, err := logger.handler.listAll() - assert.NoError(t, err) - assert.Equal(t, fileNum, len(remoteFiles)) + assert.Eventually(t, func() bool { + remoteFiles, err := logger.handler.listAll() + return err == nil && len(remoteFiles) == fileNum + }, time.Second*5, time.Microsecond*200) // assert local sealed files num localFields, err := logger.oldLogFiles() @@ -169,11 +170,11 @@ func TestRotateWriter_LocalRetention(t *testing.T) { logger.Rotate() logger.Write([]byte("Test")) logger.Rotate() - time.Sleep(time.Duration(1) * time.Second) - logFiles, err := logger.oldLogFiles() - assert.NoError(t, err) - assert.Equal(t, 1, len(logFiles)) + assert.Eventually(t, func() bool { + logFiles, err := logger.oldLogFiles() + return err == nil && len(logFiles) == 1 + }, time.Second*5, time.Millisecond*200) } func TestRotateWriter_BasicError(t *testing.T) {