diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 05a48694f4..3fcb986960 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -296,14 +296,21 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all return stat.GetTotalRows() }) - // Allocated IDs are used for rowID and the BEGINNING of the logID. - allocNum := totalRows + 1 + // Pre-allocate IDs for autoIDs and logIDs. + preAllocIDNum := (totalRows + 1) * paramtable.Get().DataCoordCfg.ImportPreAllocIDExpansionFactor.GetAsInt64() - idBegin, idEnd, err := alloc.AllocN(allocNum) + idBegin, idEnd, err := alloc.AllocN(preAllocIDNum) if err != nil { return nil, err } + log.Info("pre-allocate ids and ts for import task", WrapTaskLog(task, + zap.Int64("totalRows", totalRows), + zap.Int64("idBegin", idBegin), + zap.Int64("idEnd", idEnd), + zap.Uint64("ts", ts))..., + ) + importFiles := lo.Map(task.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { return fileStat.GetImportFile() }) diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 2564935ebc..a6076943e3 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -19,7 +19,6 @@ package importv2 import ( "context" "io" - "math" "time" "github.com/cockroachdb/errors" @@ -66,8 +65,8 @@ func NewImportTask(req *datapb.ImportRequest, if importutilv2.IsBackup(req.GetOptions()) { UnsetAutoID(req.GetSchema()) } - // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) + // Allocator for autoIDs and logIDs. + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), req.GetIDRange().GetEnd()) task := &ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 8f72832841..4fa7c67d77 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "math" "time" "github.com/cockroachdb/errors" @@ -61,8 +60,8 @@ func NewL0ImportTask(req *datapb.ImportRequest, cm storage.ChunkManager, ) Task { ctx, cancel := context.WithCancel(context.Background()) - // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) + // Allocator for autoIDs and logIDs. + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), req.GetIDRange().GetEnd()) task := &L0ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/services.go b/internal/datanode/services.go index e6e2076ed2..03147d88c2 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -461,6 +461,9 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.Strings("vchannels", req.GetVchannels()), + zap.Uint64("ts", req.GetTs()), + zap.Int64("idBegin", req.GetIDRange().GetBegin()), + zap.Int64("idEnd", req.GetIDRange().GetEnd()), zap.Any("segments", req.GetRequestSegments()), zap.Any("files", req.GetFiles())) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 198b9832e2..75269a9376 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3629,15 +3629,16 @@ type dataCoordConfig struct { CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` // import - FilesPerPreImportTask ParamItem `refreshable:"true"` - ImportTaskRetention ParamItem `refreshable:"true"` - MaxSizeInMBPerImportTask ParamItem `refreshable:"true"` - ImportScheduleInterval ParamItem `refreshable:"true"` - ImportCheckIntervalHigh ParamItem `refreshable:"true"` - ImportCheckIntervalLow ParamItem `refreshable:"true"` - MaxFilesPerImportReq ParamItem `refreshable:"true"` - MaxImportJobNum ParamItem `refreshable:"true"` - WaitForIndex ParamItem `refreshable:"true"` + FilesPerPreImportTask ParamItem `refreshable:"true"` + ImportTaskRetention ParamItem `refreshable:"true"` + MaxSizeInMBPerImportTask ParamItem `refreshable:"true"` + ImportScheduleInterval ParamItem `refreshable:"true"` + ImportCheckIntervalHigh ParamItem `refreshable:"true"` + ImportCheckIntervalLow ParamItem `refreshable:"true"` + MaxFilesPerImportReq ParamItem `refreshable:"true"` + MaxImportJobNum ParamItem `refreshable:"true"` + WaitForIndex ParamItem `refreshable:"true"` + ImportPreAllocIDExpansionFactor ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -4540,6 +4541,14 @@ if param targetVecIndexVersion is not set, the default value is -1, which means } p.WaitForIndex.Init(base.mgr) + p.ImportPreAllocIDExpansionFactor = ParamItem{ + Key: "dataCoord.import.preAllocateIDExpansionFactor", + Version: "2.5.13", + DefaultValue: "10", + Doc: `The expansion factor for pre-allocating IDs during import.`, + } + p.ImportPreAllocIDExpansionFactor.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataCoord.gracefulStopTimeout", Version: "2.3.7",