From 68bd06422862f9d329c108ee9f58081c98d282ad Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 7 Jan 2026 12:41:24 +0800 Subject: [PATCH] fix: Fix concurrent map access panic in CopySegmentTask (#46829) issue: #44358 The panic "concurrent map iteration and map write" was introduced in PR #44361. It occurred when QueryCopySegment RPC iterated segmentResults while copySingleSegment was updating it concurrently. - Deep copy segmentResults in Clone() to avoid shared map reference - Return map copy in GetSegmentResults() to prevent iteration conflict - Update tests to get task from manager after Update() operations This fix follows the same deep-copy pattern used in ImportTask and L0ImportTask. Signed-off-by: Wei Liu --- .../datanode/importv2/task_copy_segment.go | 20 +++++++++++++++---- .../importv2/task_copy_segment_test.go | 12 +++++------ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/internal/datanode/importv2/task_copy_segment.go b/internal/datanode/importv2/task_copy_segment.go index f826d58b12..e78ebfb648 100644 --- a/internal/datanode/importv2/task_copy_segment.go +++ b/internal/datanode/importv2/task_copy_segment.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // Copy Segment Task for Snapshot Restore @@ -229,10 +230,16 @@ func (t *CopySegmentTask) Cancel() { t.cancel() } -// Clone creates a shallow copy of the task. +// Clone creates a copy of the task with deep-copied segmentResults. // Note: This shares references to manager, cm, and other components. -// Used for task state inspection without modifying the original. +// The segmentResults map is deep-copied to avoid concurrent map access. func (t *CopySegmentTask) Clone() Task { + // Deep copy segmentResults to avoid concurrent map access + results := make(map[int64]*datapb.CopySegmentResult) + for id, result := range t.segmentResults { + results[id] = typeutil.Clone(result) + } + return &CopySegmentTask{ ctx: t.ctx, cancel: t.cancel, @@ -243,7 +250,7 @@ func (t *CopySegmentTask) Clone() Task { state: t.state, reason: t.reason, slots: t.slots, - segmentResults: t.segmentResults, + segmentResults: results, req: t.req, manager: t.manager, cm: t.cm, @@ -253,7 +260,12 @@ func (t *CopySegmentTask) Clone() Task { // GetSegmentResults returns the copy results for all target segments. // This is called by DataCoord to retrieve binlog/index metadata after task completion. func (t *CopySegmentTask) GetSegmentResults() map[int64]*datapb.CopySegmentResult { - return t.segmentResults + // Return a copy to avoid concurrent map access during iteration + results := make(map[int64]*datapb.CopySegmentResult) + for id, result := range t.segmentResults { + results[id] = result + } + return results } // ============================================================================ diff --git a/internal/datanode/importv2/task_copy_segment_test.go b/internal/datanode/importv2/task_copy_segment_test.go index 55375616b2..9107d26bd3 100644 --- a/internal/datanode/importv2/task_copy_segment_test.go +++ b/internal/datanode/importv2/task_copy_segment_test.go @@ -244,8 +244,8 @@ func TestCopySegmentTaskExecute(t *testing.T) { _, err := futures[0].Await() assert.NoError(t, err) - // Verify segment results are updated - copyTask := task.(*CopySegmentTask) + // Verify segment results are updated (get updated task from manager) + copyTask := mockManager.Get(task.GetTaskID()).(*CopySegmentTask) segmentResults := copyTask.GetSegmentResults() assert.Equal(t, 1, len(segmentResults)) assert.NotNil(t, segmentResults[666]) @@ -336,8 +336,8 @@ func TestCopySegmentTaskExecute(t *testing.T) { assert.NoError(t, err) } - // Verify both segment results are updated - copyTask := task.(*CopySegmentTask) + // Verify both segment results are updated (get updated task from manager) + copyTask := mockManager.Get(task.GetTaskID()).(*CopySegmentTask) segmentResults := copyTask.GetSegmentResults() assert.Equal(t, 2, len(segmentResults)) @@ -994,8 +994,8 @@ func TestCopySegmentTaskEdgeCases(t *testing.T) { _, err := futures[0].Await() assert.NoError(t, err) - // Verify total rows - copyTask := task.(*CopySegmentTask) + // Verify total rows (get updated task from manager) + copyTask := mockManager.Get(task.GetTaskID()).(*CopySegmentTask) segmentResults := copyTask.GetSegmentResults() assert.Equal(t, int64(10000), segmentResults[666].ImportedRows) // 100 files * 100 rows each })