From 6fda1f69c8b430d1dd6a6783102cd532a8f8c4b7 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 4 Jun 2025 19:58:31 +0800 Subject: [PATCH] fix: Fix duplicate autoID between import and insert (#42519) Remove the unlimited logID mechanism and switch to redundantly allocating a large number of IDs. issue: https://github.com/milvus-io/milvus/issues/42518 Signed-off-by: bigsheeper --- internal/datacoord/import_util.go | 13 +++++++--- internal/datanode/importv2/task_import.go | 5 ++-- internal/datanode/importv2/task_l0_import.go | 5 ++-- internal/datanode/services.go | 3 +++ pkg/util/paramtable/component_param.go | 27 +++++++++++++------- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 91b5283ba8..a4fd30b6ec 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -314,14 +314,21 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all return stat.GetTotalRows() }) - // Allocated IDs are used for rowID and the BEGINNING of the logID. - allocNum := totalRows + 1 + // Pre-allocate IDs for autoIDs and logIDs. + preAllocIDNum := (totalRows + 1) * paramtable.Get().DataCoordCfg.ImportPreAllocIDExpansionFactor.GetAsInt64() - idBegin, idEnd, err := alloc.AllocN(allocNum) + idBegin, idEnd, err := alloc.AllocN(preAllocIDNum) if err != nil { return nil, err } + log.Info("pre-allocate ids and ts for import task", WrapTaskLog(task, + zap.Int64("totalRows", totalRows), + zap.Int64("idBegin", idBegin), + zap.Int64("idEnd", idEnd), + zap.Uint64("ts", ts))..., + ) + importFiles := lo.Map(task.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { return fileStat.GetImportFile() }) diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 0eea6cd274..38ad3faf13 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -19,7 +19,6 @@ package importv2 import ( "context" "io" - "math" "time" "github.com/cockroachdb/errors" @@ -66,8 +65,8 @@ func NewImportTask(req *datapb.ImportRequest, if importutilv2.IsBackup(req.GetOptions()) { UnsetAutoID(req.GetSchema()) } - // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) + // Allocator for autoIDs and logIDs. + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), req.GetIDRange().GetEnd()) task := &ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 8f72832841..4fa7c67d77 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "math" "time" "github.com/cockroachdb/errors" @@ -61,8 +60,8 @@ func NewL0ImportTask(req *datapb.ImportRequest, cm storage.ChunkManager, ) Task { ctx, cancel := context.WithCancel(context.Background()) - // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) + // Allocator for autoIDs and logIDs. + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), req.GetIDRange().GetEnd()) task := &L0ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 54d7ed55e6..9144641bd6 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -465,6 +465,9 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.Strings("vchannels", req.GetVchannels()), + zap.Uint64("ts", req.GetTs()), + zap.Int64("idBegin", req.GetIDRange().GetBegin()), + zap.Int64("idEnd", req.GetIDRange().GetEnd()), zap.Any("segments", req.GetRequestSegments()), zap.Any("files", req.GetFiles())) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 250d47155c..e3388010ff 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3967,15 +3967,16 @@ type dataCoordConfig struct { CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` // import - FilesPerPreImportTask ParamItem `refreshable:"true"` - ImportTaskRetention ParamItem `refreshable:"true"` - MaxSizeInMBPerImportTask ParamItem `refreshable:"true"` - ImportScheduleInterval ParamItem `refreshable:"true"` - ImportCheckIntervalHigh ParamItem `refreshable:"true"` - ImportCheckIntervalLow ParamItem `refreshable:"true"` - MaxFilesPerImportReq ParamItem `refreshable:"true"` - MaxImportJobNum ParamItem `refreshable:"true"` - WaitForIndex ParamItem `refreshable:"true"` + FilesPerPreImportTask ParamItem `refreshable:"true"` + ImportTaskRetention ParamItem `refreshable:"true"` + MaxSizeInMBPerImportTask ParamItem `refreshable:"true"` + ImportScheduleInterval ParamItem `refreshable:"true"` + ImportCheckIntervalHigh ParamItem `refreshable:"true"` + ImportCheckIntervalLow ParamItem `refreshable:"true"` + MaxFilesPerImportReq ParamItem `refreshable:"true"` + MaxImportJobNum ParamItem `refreshable:"true"` + WaitForIndex ParamItem `refreshable:"true"` + ImportPreAllocIDExpansionFactor ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -4879,6 +4880,14 @@ if param targetVecIndexVersion is not set, the default value is -1, which means } p.WaitForIndex.Init(base.mgr) + p.ImportPreAllocIDExpansionFactor = ParamItem{ + Key: "dataCoord.import.preAllocateIDExpansionFactor", + Version: "2.5.13", + DefaultValue: "10", + Doc: `The expansion factor for pre-allocating IDs during import.`, + } + p.ImportPreAllocIDExpansionFactor.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataCoord.gracefulStopTimeout", Version: "2.3.7",