From e6d4cde3979264dc9ce9aa2c96071ef1e9a2165d Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 27 Oct 2023 15:14:13 +0800 Subject: [PATCH] Fix duplicated segment id in bulkinsert task state (#27915) Signed-off-by: yhmo --- internal/rootcoord/import_manager.go | 24 +++++++++++++----- internal/rootcoord/import_manager_test.go | 31 +++++++++++++++++++++++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index d358291a1b..f7f4085a25 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -586,12 +586,7 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im // Meta persist should be done before memory objs change. toPersistImportTaskInfo = cloneImportTaskInfo(v) toPersistImportTaskInfo.State.StateCode = ir.GetState() - // if is started state, append the new created segment id - if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted { - toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...) - } else { - toPersistImportTaskInfo.State.Segments = ir.GetSegments() - } + toPersistImportTaskInfo.State.Segments = mergeArray(toPersistImportTaskInfo.State.Segments, ir.GetSegments()) toPersistImportTaskInfo.State.RowCount = ir.GetRowCount() toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds() for _, kv := range ir.GetInfos() { @@ -1087,3 +1082,20 @@ func cloneImportTaskInfo(taskInfo *datapb.ImportTaskInfo) *datapb.ImportTaskInfo } return cloned } + +func mergeArray(arr1 []int64, arr2 []int64) []int64 { + reduce := make(map[int64]int) + doReduce := func(arr []int64) { + for _, v := range arr { + reduce[v] = 1 + } + } + doReduce(arr1) + doReduce(arr2) + + result := make([]int64, 0, len(reduce)) + for k := range reduce { + result = append(result, k) + } + return result +} diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index 3b841c221d..fe94b2db66 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -18,6 +18,7 @@ package rootcoord import ( "context" + "sort" "strings" "sync" "testing" @@ -1101,3 +1102,33 @@ func TestImportManager_isRowbased(t *testing.T) { assert.NoError(t, err) assert.False(t, rb) } + +func TestImportManager_mergeArray(t *testing.T) { + converter := func(arr []int64) []int { + res := make([]int, 0, len(arr)) + for _, v := range arr { + res = append(res, int(v)) + } + sort.Ints(res) + return res + } + + arr1 := []int64{1, 2, 3} + arr2 := []int64{2, 4, 6} + res := converter(mergeArray(arr1, arr2)) + assert.Equal(t, []int{1, 2, 3, 4, 6}, res) + + res = converter(mergeArray(arr1, nil)) + assert.Equal(t, []int{1, 2, 3}, res) + + res = converter(mergeArray(nil, arr2)) + assert.Equal(t, []int{2, 4, 6}, res) + + res = converter(mergeArray(nil, nil)) + assert.Equal(t, []int{}, res) + + arr1 = []int64{1, 2, 3} + arr2 = []int64{6, 5, 4} + res = converter(mergeArray(arr1, arr2)) + assert.Equal(t, []int{1, 2, 3, 4, 5, 6}, res) +}