From 4e264003bf39f6b0f54945fea21e38c8b696112f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 1 Apr 2024 20:09:13 +0800 Subject: [PATCH] enhance: Ensure ImportV2 waits for the index to be built and refine some logic (#31629) Feature Introduced: 1. Ensure ImportV2 waits for the index to be built Enhancements Introduced: 1. Utilization of local time for timeout ts instead of allocating ts from rootcoord. 3. Enhanced input file length check for binlog import. 4. Removal of duplicated manager in datanode. 5. Renaming of executor to scheduler in datanode. 6. Utilization of a thread pool in the scheduler in datanode. issue: https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + internal/datacoord/import_checker.go | 78 +++++---- internal/datacoord/import_checker_test.go | 5 +- internal/datacoord/import_scheduler.go | 26 ++- internal/datacoord/import_scheduler_test.go | 3 +- internal/datacoord/import_util.go | 13 +- internal/datacoord/import_util_test.go | 83 ++++++---- internal/datacoord/index_meta.go | 20 +++ internal/datacoord/index_meta_test.go | 16 ++ internal/datacoord/segment_manager.go | 2 +- internal/datacoord/server.go | 4 +- internal/datacoord/services.go | 9 +- internal/datanode/data_node.go | 13 +- internal/datanode/importv2/manager.go | 36 ----- .../importv2/{executor.go => scheduler.go} | 151 +++++++++--------- .../{executor_test.go => scheduler_test.go} | 66 ++++---- internal/datanode/services.go | 12 +- internal/util/importutilv2/binlog/reader.go | 6 +- pkg/util/conc/future.go | 12 +- pkg/util/paramtable/component_param.go | 11 ++ pkg/util/paramtable/component_param_test.go | 1 + 21 files changed, 311 insertions(+), 257 deletions(-) delete mode 100644 internal/datanode/importv2/manager.go rename internal/datanode/importv2/{executor.go => scheduler.go} (69%) rename internal/datanode/importv2/{executor_test.go => scheduler_test.go} (92%) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0bd08c6200..4ed1660602 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -444,6 +444,7 @@ dataCoord: filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task. taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state. maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request. + waitForIndex: true # Indicates whether the import operation waits for the completion of index building. enableGarbageCollection: true gc: diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 5b33593fa9..a1c86cc560 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -39,13 +39,12 @@ type ImportChecker interface { } type importChecker struct { - meta *meta - broker broker.Broker - cluster Cluster - alloc allocator - sm Manager - imeta ImportMeta - buildIndexCh chan UniqueID + meta *meta + broker broker.Broker + cluster Cluster + alloc allocator + sm Manager + imeta ImportMeta closeOnce sync.Once closeChan chan struct{} @@ -57,17 +56,15 @@ func NewImportChecker(meta *meta, alloc allocator, sm Manager, imeta ImportMeta, - buildIndexCh chan UniqueID, ) ImportChecker { return &importChecker{ - meta: meta, - broker: broker, - cluster: cluster, - alloc: alloc, - sm: sm, - imeta: imeta, - buildIndexCh: buildIndexCh, - closeChan: make(chan struct{}), + meta: meta, + broker: broker, + cluster: cluster, + alloc: alloc, + sm: sm, + imeta: imeta, + closeChan: make(chan struct{}), } } @@ -241,6 +238,8 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { } func (c *importChecker) checkImportingJob(job ImportJob) { + log := log.With(zap.Int64("jobID", job.GetJobID()), + zap.Int64("collectionID", job.GetCollectionID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { @@ -248,40 +247,35 @@ func (c *importChecker) checkImportingJob(job ImportJob) { } } - unfinished := make([]int64, 0) - for _, task := range tasks { - segmentIDs := task.(*importTask).GetSegmentIDs() - for _, segmentID := range segmentIDs { - segment := c.meta.GetSegment(segmentID) - if segment == nil { - log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...) - continue - } - if segment.GetIsImporting() { - unfinished = append(unfinished, segmentID) - } - } - } + segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - err := c.sm.FlushImportSegments(ctx, job.GetCollectionID(), unfinished) - if err != nil { - log.Warn("flush imported segments failed", zap.Int64("jobID", job.GetJobID()), - zap.Int64("collectionID", job.GetCollectionID()), zap.Int64s("segments", unfinished), zap.Error(err)) + // Verify completion of index building for imported segments. + unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs) + if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 { + log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) return } + unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { + segment := c.meta.GetSegment(segmentID) + if segment == nil { + log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID)) + return false + } + return segment.GetIsImporting() + }) + channels, err := c.meta.GetSegmentsChannels(unfinished) if err != nil { - log.Warn("get segments channels failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("get segments channels failed", zap.Error(err)) return } for _, segmentID := range unfinished { - c.buildIndexCh <- segmentID // accelerate index building channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { - log.Warn("nil channel checkpoint", zap.Int64("jobID", job.GetJobID())) + log.Warn("nil channel checkpoint") return } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) @@ -289,7 +283,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) { op3 := UpdateIsImporting(segmentID, false) err = c.meta.UpdateSegmentsInfo(op1, op2, op3) if err != nil { - log.Warn("update import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("update import segment failed", zap.Error(err)) return } } @@ -297,8 +291,10 @@ func (c *importChecker) checkImportingJob(job ImportJob) { completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { - log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("failed to update job state to Completed", zap.Error(err)) + return } + log.Info("import job completed") } func (c *importChecker) tryFailingTasks(job ImportJob) { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 0451cfe68a..8c90918a3a 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -65,9 +65,8 @@ func (s *ImportCheckerSuite) SetupTest() { broker := broker2.NewMockBroker(s.T()) sm := NewMockManager(s.T()) - buildIndexCh := make(chan UniqueID, 1024) - checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta, buildIndexCh).(*importChecker) + checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta).(*importChecker) s.checker = checker job := &importJob{ @@ -178,8 +177,6 @@ func (s *ImportCheckerSuite) TestCheckJob() { s.Equal(true, segment.GetIsImporting()) } } - sm := s.checker.sm.(*MockManager) - sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index f26ded5f48..68086666fd 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -47,6 +48,8 @@ type importScheduler struct { alloc allocator imeta ImportMeta + buildIndexCh chan UniqueID + closeOnce sync.Once closeChan chan struct{} } @@ -55,13 +58,15 @@ func NewImportScheduler(meta *meta, cluster Cluster, alloc allocator, imeta ImportMeta, + buildIndexCh chan UniqueID, ) ImportScheduler { return &importScheduler{ - meta: meta, - cluster: cluster, - alloc: alloc, - imeta: imeta, - closeChan: make(chan struct{}), + meta: meta, + cluster: cluster, + alloc: alloc, + imeta: imeta, + buildIndexCh: buildIndexCh, + closeChan: make(chan struct{}), } } @@ -159,7 +164,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 { }(nodeID) } wg.Wait() - log.Info("peek slots done", zap.Any("nodeSlots", nodeSlots)) + log.Debug("peek slots done", zap.Any("nodeSlots", nodeSlots)) return nodeSlots } @@ -295,12 +300,17 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) return } - op := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) - err = s.meta.UpdateSegmentsInfo(op) + op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) + op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) + err = s.meta.UpdateSegmentsInfo(op1, op2) if err != nil { log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...) return } + select { + case s.buildIndexCh <- info.GetSegmentID(): // accelerate index building: + default: + } } completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 20356d5a37..0f9acf578f 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -67,7 +67,8 @@ func (s *ImportSchedulerSuite) SetupTest() { }) s.imeta, err = NewImportMeta(s.catalog) s.NoError(err) - s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler) + buildIndexCh := make(chan UniqueID, 1024) + s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta, buildIndexCh).(*importScheduler) } func (s *ImportSchedulerSuite) TestProcessPreImport() { diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 6193a5fe2c..d94350768d 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "fmt" "path" "sort" "time" @@ -349,7 +350,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i if totalSegment != 0 { completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment) } - return importingProgress*0.8 + completedProgress*0.2, importedRows, totalRows + return importingProgress*0.5 + completedProgress*0.5, importedRows, totalRows } func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) { @@ -361,11 +362,11 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalp case internalpb.ImportJobState_PreImporting: progress := getPreImportingProgress(jobID, imeta) - return 10 + int64(progress*40), internalpb.ImportJobState_Importing, 0, 0, "" + return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, "" case internalpb.ImportJobState_Importing: progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta) - return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, importedRows, totalRows, "" + return 10 + 30 + int64(progress*60), internalpb.ImportJobState_Importing, importedRows, totalRows, "" case internalpb.ImportJobState_Completed: totalRows := int64(0) @@ -428,9 +429,13 @@ func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error { } func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) { - if len(importFile.GetPaths()) < 1 { + if len(importFile.GetPaths()) == 0 { return nil, merr.WrapErrImportFailed("no insert binlogs to import") } + if len(importFile.GetPaths()) > 2 { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+ + "Valid paths length should be one or two, but got paths:%s", importFile.GetPaths())) + } insertPrefix := importFile.GetPaths()[0] segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false) diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 52af580393..a5c7f266c3 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -322,38 +322,53 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { deltaPrefix = "mock-delta-binlog-prefix" ) - segmentInsertPaths := []string{ - // segment 435978159261483008 - "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008", - // segment 435978159261483009 - "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009", - } - - segmentDeltaPaths := []string{ - "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008", - "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009", - } - - ctx := context.Background() - cm := mocks2.NewChunkManager(t) - cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) - cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) - - file := &internalpb.ImportFile{ - Id: 1, - Paths: []string{insertPrefix, deltaPrefix}, - } - - files, err := ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.NoError(t, err) - assert.Equal(t, 2, len(files)) - for _, f := range files { - assert.Equal(t, 2, len(f.GetPaths())) - for _, p := range f.GetPaths() { - segmentID := path.Base(p) - assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") + t.Run("normal case", func(t *testing.T) { + segmentInsertPaths := []string{ + // segment 435978159261483008 + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008", + // segment 435978159261483009 + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009", } - } + + segmentDeltaPaths := []string{ + "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008", + "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009", + } + + cm := mocks2.NewChunkManager(t) + cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) + + file := &internalpb.ImportFile{ + Id: 1, + Paths: []string{insertPrefix, deltaPrefix}, + } + + files, err := ListBinlogsAndGroupBySegment(context.Background(), cm, file) + assert.NoError(t, err) + assert.Equal(t, 2, len(files)) + for _, f := range files { + assert.Equal(t, 2, len(f.GetPaths())) + for _, p := range f.GetPaths() { + segmentID := path.Base(p) + assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") + } + } + }) + + t.Run("invalid input", func(t *testing.T) { + file := &internalpb.ImportFile{ + Paths: []string{}, + } + _, err := ListBinlogsAndGroupBySegment(context.Background(), nil, file) + assert.Error(t, err) + t.Logf("%s", err) + + file.Paths = []string{insertPrefix, deltaPrefix, "dummy_prefix"} + _, err = ListBinlogsAndGroupBySegment(context.Background(), nil, file) + assert.Error(t, err) + t.Logf("%s", err) + }) } func TestImportUtil_GetImportProgress(t *testing.T) { @@ -517,7 +532,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(10+40), progress) + assert.Equal(t, int64(10+30), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -525,7 +540,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(10+40+40*0.5), progress) + assert.Equal(t, int64(10+30+30*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -547,7 +562,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress) + assert.Equal(t, int64(float32(10+30+30+30*2/6)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index c36701f9b1..6519ec5106 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -25,6 +25,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -885,3 +886,22 @@ func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []U return ret } + +func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64) []int64 { + indexes := m.GetIndexesForCollection(collectionID, "") + if len(indexes) == 0 { + // doesn't have index + return nil + } + indexed := make([]int64, 0, len(segmentIDs)) + segIndexStates := m.getSegmentsIndexStates(collectionID, segmentIDs) + for segmentID, states := range segIndexStates { + indexStates := lo.Filter(lo.Values(states), func(state *indexpb.SegmentIndexState, _ int) bool { + return state.GetState() == commonpb.IndexState_Finished + }) + if len(indexStates) == len(indexes) { + indexed = append(indexed, segmentID) + } + } + return lo.Without(segmentIDs, indexed...) +} diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 022add231d..f6018c54de 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1328,3 +1328,19 @@ func TestRemoveSegmentIndex(t *testing.T) { assert.Equal(t, len(m.buildID2SegmentIndex), 0) }) } + +func TestIndexMeta_GetUnindexedSegments(t *testing.T) { + m := createMetaTable(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}) + + // normal case + segmentIDs := make([]int64, 0, 11) + for i := 0; i <= 10; i++ { + segmentIDs = append(segmentIDs, segID+int64(i)) + } + unindexed := m.indexMeta.GetUnindexedSegments(collID, segmentIDs) + assert.Equal(t, 8, len(unindexed)) + + // no index + unindexed = m.indexMeta.GetUnindexedSegments(collID+1, segmentIDs) + assert.Equal(t, 0, len(unindexed)) +} diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 1673e9fefd..5106c8a489 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -375,7 +375,7 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c PartitionID: partitionID, InsertChannel: channelName, NumOfRows: 0, - State: commonpb.SegmentState_Flushed, + State: commonpb.SegmentState_Importing, MaxRowNum: 0, Level: datapb.SegmentLevel_L1, LastExpireTime: math.MaxUint64, diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 6ee0eb1f1b..85dadc4520 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -384,8 +384,8 @@ func (s *Server) initDataCoord() error { if err != nil { return err } - s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) - s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta, s.buildIndexCh) + s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh) + s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta) s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index dce783af52..505ed3ef6b 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1656,17 +1656,14 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter var timeoutTs uint64 = math.MaxUint64 timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) if err == nil { + // Specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". dur, err := time.ParseDuration(timeoutStr) if err != nil { resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err))) return resp, nil } - ts, err := s.allocator.allocTimestamp(ctx) - if err != nil { - resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc ts failed, err=%w", err))) - return resp, nil - } - timeoutTs = tsoutil.AddPhysicalDurationOnTs(ts, dur) + curTs := tsoutil.GetCurrentTime() + timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) } files := in.GetFiles() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 02570074d9..50b75f0ddb 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -94,7 +94,8 @@ type DataNode struct { syncMgr syncmgr.SyncManager writeBufferManager writebuffer.BufferManager - importManager *importv2.Manager + importTaskMgr importv2.TaskManager + importScheduler importv2.Scheduler clearSignal chan string // vchannel name segmentCache *Cache @@ -290,8 +291,8 @@ func (node *DataNode) Init() error { node.writeBufferManager = writebuffer.NewManager(syncMgr) - node.importManager = importv2.NewManager(node.syncMgr, node.chunkManager) - + node.importTaskMgr = importv2.NewTaskManager() + node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) log.Info("init datanode done", zap.String("Address", node.address)) @@ -386,7 +387,7 @@ func (node *DataNode) Start() error { go node.compactionExecutor.start(node.ctx) - go node.importManager.Start() + go node.importScheduler.Start() if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID, @@ -459,8 +460,8 @@ func (node *DataNode) Stop() error { node.channelCheckpointUpdater.close() } - if node.importManager != nil { - node.importManager.Close() + if node.importScheduler != nil { + node.importScheduler.Close() } node.cancel() diff --git a/internal/datanode/importv2/manager.go b/internal/datanode/importv2/manager.go deleted file mode 100644 index d76fe110cf..0000000000 --- a/internal/datanode/importv2/manager.go +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importv2 - -import ( - "github.com/milvus-io/milvus/internal/datanode/syncmgr" - "github.com/milvus-io/milvus/internal/storage" -) - -type Manager struct { - TaskManager - Executor -} - -func NewManager(syncMgr syncmgr.SyncManager, cm storage.ChunkManager) *Manager { - tm := NewTaskManager() - e := NewExecutor(tm, syncMgr, cm) - return &Manager{ - TaskManager: tm, - Executor: e, - } -} diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/scheduler.go similarity index 69% rename from internal/datanode/importv2/executor.go rename to internal/datanode/importv2/scheduler.go index 999ffbdfb4..d681344888 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/scheduler.go @@ -33,17 +33,16 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/paramtable" ) -type Executor interface { +type Scheduler interface { Start() Slots() int64 Close() } -type executor struct { +type scheduler struct { manager TaskManager syncMgr syncmgr.SyncManager cm storage.ChunkManager @@ -54,13 +53,12 @@ type executor struct { closeChan chan struct{} } -func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Executor { +func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler { pool := conc.NewPool[any]( - hardware.GetCPUNum()*2, - conc.WithPreAlloc(false), - conc.WithDisablePurge(false), + paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(), + conc.WithPreAlloc(true), ) - return &executor{ + return &scheduler{ manager: manager, syncMgr: syncMgr, cm: cm, @@ -69,8 +67,8 @@ func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.Ch } } -func (e *executor) Start() { - log.Info("start import executor") +func (s *scheduler) Start() { + log.Info("start import scheduler") var ( exeTicker = time.NewTicker(1 * time.Second) logTicker = time.NewTicker(10 * time.Minute) @@ -79,39 +77,46 @@ func (e *executor) Start() { defer logTicker.Stop() for { select { - case <-e.closeChan: - log.Info("import executor exited") + case <-s.closeChan: + log.Info("import scheduler exited") return case <-exeTicker.C: - tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) - wg := &sync.WaitGroup{} + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + futures := make(map[int64][]*conc.Future[any]) for _, task := range tasks { - wg.Add(1) - go func(task Task) { - defer wg.Done() - switch task.GetType() { - case PreImportTaskType: - e.PreImport(task) - case ImportTaskType: - e.Import(task) - } - }(task) + switch task.GetType() { + case PreImportTaskType: + fs := s.PreImport(task) + futures[task.GetTaskID()] = fs + tryFreeFutures(futures) + case ImportTaskType: + fs := s.Import(task) + futures[task.GetTaskID()] = fs + tryFreeFutures(futures) + } + } + for taskID, fs := range futures { + err := conc.AwaitAll(fs...) + if err != nil { + return + } + s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed)) + log.Info("preimport/import done", zap.Int64("taskID", taskID)) } - wg.Wait() case <-logTicker.C: - LogStats(e.manager) + LogStats(s.manager) } } } -func (e *executor) Slots() int64 { - tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) +func (s *scheduler) Slots() int64 { + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks)) } -func (e *executor) Close() { - e.closeOnce.Do(func() { - close(e.closeChan) +func (s *scheduler) Close() { + s.closeOnce.Do(func() { + close(s.closeChan) }) } @@ -126,33 +131,46 @@ func WrapLogFields(task Task, fields ...zap.Field) []zap.Field { return res } -func (e *executor) handleErr(task Task, err error, msg string) { - log.Warn(msg, WrapLogFields(task, zap.Error(err))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) +func tryFreeFutures(futures map[int64][]*conc.Future[any]) { + for k, fs := range futures { + fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool { + if f.Done() { + _, err := f.Await() + return err != nil + } + return true + }) + futures[k] = fs + } } -func (e *executor) PreImport(task Task) { +func (s *scheduler) handleErr(task Task, err error, msg string) { + log.Warn(msg, WrapLogFields(task, zap.Error(err))...) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) +} + +func (s *scheduler) PreImport(task Task) []*conc.Future[any] { bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() log.Info("start to preimport", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) files := lo.Map(task.(*PreImportTask).GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { return fileStat.GetImportFile() }) fn := func(i int, file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) + reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) if err != nil { - e.handleErr(task, err, "new reader failed") + s.handleErr(task, err, "new reader failed") return err } defer reader.Close() start := time.Now() - err = e.readFileStat(reader, task, i) + err = s.readFileStat(reader, task, i) if err != nil { - e.handleErr(task, err, "preimport failed") + s.handleErr(task, err, "preimport failed") return err } log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), @@ -164,23 +182,16 @@ func (e *executor) PreImport(task Task) { for i, file := range files { i := i file := file - f := e.pool.Submit(func() (any, error) { + f := s.pool.Submit(func() (any, error) { err := fn(i, file) return err, err }) futures = append(futures, f) } - err := conc.AwaitAll(futures...) - if err != nil { - return - } - - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) - log.Info("executor preimport done", - WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...) + return futures } -func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { +func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { fileSize, err := reader.Size() if err != nil { return err @@ -225,30 +236,30 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i TotalMemorySize: int64(totalSize), HashedStats: hashedStats, } - e.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) + s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) return nil } -func (e *executor) Import(task Task) { +func (s *scheduler) Import(task Task) []*conc.Future[any] { bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() log.Info("start to import", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) req := task.(*ImportTask).req fn := func(file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) + reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) if err != nil { - e.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) + s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) return err } defer reader.Close() start := time.Now() - err = e.importFile(reader, task) + err = s.importFile(reader, task) if err != nil { - e.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) + s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) return err } log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), @@ -259,24 +270,18 @@ func (e *executor) Import(task Task) { futures := make([]*conc.Future[any], 0, len(req.GetFiles())) for _, file := range req.GetFiles() { file := file - f := e.pool.Submit(func() (any, error) { + f := s.pool.Submit(func() (any, error) { err := fn(file) return err, err }) futures = append(futures, f) } - err := conc.AwaitAll(futures...) - if err != nil { - return - } - - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) - log.Info("import done", WrapLogFields(task)...) + return futures } -func (e *executor) importFile(reader importutilv2.Reader, task Task) error { +func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error { iTask := task.(*ImportTask) - futures := make([]*conc.Future[error], 0) + syncFutures := make([]*conc.Future[error], 0) syncTasks := make([]syncmgr.Task, 0) for { data, err := reader.Read() @@ -294,14 +299,14 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error { if err != nil { return err } - fs, sts, err := e.Sync(iTask, hashedData) + fs, sts, err := s.Sync(iTask, hashedData) if err != nil { return err } - futures = append(futures, fs...) + syncFutures = append(syncFutures, fs...) syncTasks = append(syncTasks, sts...) } - err := conc.AwaitAll(futures...) + err := conc.AwaitAll(syncFutures...) if err != nil { return err } @@ -310,13 +315,13 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error { if err != nil { return err } - e.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) + s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) } return nil } -func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { +func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { log.Info("start to sync import data", WrapLogFields(task)...) futures := make([]*conc.Future[error], 0) syncTasks := make([]syncmgr.Task, 0) @@ -335,7 +340,7 @@ func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future return nil, nil, err } segmentImportedSizes[segmentID] += size - future := e.syncMgr.SyncData(task.GetCtx(), syncTask) + future := s.syncMgr.SyncData(task.GetCtx(), syncTask) futures = append(futures, future) syncTasks = append(syncTasks, syncTask) } diff --git a/internal/datanode/importv2/executor_test.go b/internal/datanode/importv2/scheduler_test.go similarity index 92% rename from internal/datanode/importv2/executor_test.go rename to internal/datanode/importv2/scheduler_test.go index d4739d28a2..10c93fe559 100644 --- a/internal/datanode/importv2/executor_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -65,24 +65,24 @@ type mockReader struct { io.Seeker } -type ExecutorSuite struct { +type SchedulerSuite struct { suite.Suite numRows int schema *schemapb.CollectionSchema - cm storage.ChunkManager - reader *importutilv2.MockReader - syncMgr *syncmgr.MockSyncManager - manager TaskManager - executor *executor + cm storage.ChunkManager + reader *importutilv2.MockReader + syncMgr *syncmgr.MockSyncManager + manager TaskManager + scheduler *scheduler } -func (s *ExecutorSuite) SetupSuite() { +func (s *SchedulerSuite) SetupSuite() { paramtable.Init() } -func (s *ExecutorSuite) SetupTest() { +func (s *SchedulerSuite) SetupTest() { s.numRows = 100 s.schema = &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ @@ -116,7 +116,7 @@ func (s *ExecutorSuite) SetupTest() { s.manager = NewTaskManager() s.syncMgr = syncmgr.NewMockSyncManager(s.T()) - s.executor = NewExecutor(s.manager, s.syncMgr, nil).(*executor) + s.scheduler = NewScheduler(s.manager, s.syncMgr, nil).(*scheduler) } func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { @@ -226,7 +226,7 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount return insertData } -func (s *ExecutorSuite) TestExecutor_Slots() { +func (s *SchedulerSuite) TestScheduler_Slots() { preimportReq := &datapb.PreImportRequest{ JobID: 1, TaskID: 2, @@ -239,11 +239,11 @@ func (s *ExecutorSuite) TestExecutor_Slots() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - slots := s.executor.Slots() + slots := s.scheduler.Slots() s.Equal(paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()-1, slots) } -func (s *ExecutorSuite) TestExecutor_Start_Preimport() { +func (s *SchedulerSuite) TestScheduler_Start_Preimport() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -262,7 +262,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -276,14 +276,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { +func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -316,7 +316,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -330,14 +330,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Import() { +func (s *SchedulerSuite) TestScheduler_Start_Import() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -355,7 +355,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { @@ -391,14 +391,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() { importTask := NewImportTask(importReq) s.manager.Add(importTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { +func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -416,7 +416,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { @@ -452,14 +452,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { importTask := NewImportTask(importReq) s.manager.Add(importTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_ReadFileStat() { +func (s *SchedulerSuite) TestScheduler_ReadFileStat() { importFile := &internalpb.ImportFile{ Paths: []string{"dummy.json"}, } @@ -489,11 +489,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - err := s.executor.readFileStat(s.reader, preimportTask, 0) + err := s.scheduler.readFileStat(s.reader, preimportTask, 0) s.NoError(err) } -func (s *ExecutorSuite) TestExecutor_ImportFile() { +func (s *SchedulerSuite) TestScheduler_ImportFile() { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { return nil, nil @@ -540,10 +540,10 @@ func (s *ExecutorSuite) TestExecutor_ImportFile() { } importTask := NewImportTask(importReq) s.manager.Add(importTask) - err := s.executor.importFile(s.reader, importTask) + err := s.scheduler.importFile(s.reader, importTask) s.NoError(err) } -func TestExecutor(t *testing.T) { - suite.Run(t, new(ExecutorSuite)) +func TestScheduler(t *testing.T) { + suite.Run(t, new(SchedulerSuite)) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5b880e60ff..13868c946a 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -408,7 +408,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques } task := importv2.NewPreImportTask(req) - node.importManager.Add(task) + node.importTaskMgr.Add(task) log.Info("datanode added preimport task") return merr.Success(), nil @@ -427,7 +427,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( return merr.Status(err), nil } task := importv2.NewImportTask(req) - node.importManager.Add(task) + node.importTaskMgr.Add(task) log.Info("datanode added import task") return merr.Success(), nil @@ -441,7 +441,7 @@ func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreIm return &datapb.QueryPreImportResponse{Status: merr.Status(err)}, nil } status := merr.Success() - task := node.importManager.Get(req.GetTaskID()) + task := node.importTaskMgr.Get(req.GetTaskID()) if task == nil || task.GetType() != importv2.PreImportTaskType { status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.PreImportTaskType)) } @@ -470,12 +470,12 @@ func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRe if req.GetQuerySlot() { return &datapb.QueryImportResponse{ Status: status, - Slots: node.importManager.Slots(), + Slots: node.importScheduler.Slots(), }, nil } // query import - task := node.importManager.Get(req.GetTaskID()) + task := node.importTaskMgr.Get(req.GetTaskID()) if task == nil || task.GetType() != importv2.ImportTaskType { status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.ImportTaskType)) } @@ -498,7 +498,7 @@ func (node *DataNode) DropImport(ctx context.Context, req *datapb.DropImportRequ return merr.Status(err), nil } - node.importManager.Remove(req.GetTaskID()) + node.importTaskMgr.Remove(req.GetTaskID()) log.Info("datanode drop import done") diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 69cc763601..d85cf790d3 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -70,9 +70,13 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { if tsStart != 0 || tsEnd != math.MaxUint64 { r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd)) } - if len(paths) < 1 { + if len(paths) == 0 { return merr.WrapErrImportFailed("no insert binlogs to import") } + if len(paths) > 2 { + return merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+ + "Valid paths length should be one or two, but got paths:%s", paths)) + } insertLogs, err := listInsertLogs(r.ctx, r.cm, paths[0]) if err != nil { return err diff --git a/pkg/util/conc/future.go b/pkg/util/conc/future.go index 94c974317e..fb76a22165 100644 --- a/pkg/util/conc/future.go +++ b/pkg/util/conc/future.go @@ -16,6 +16,8 @@ package conc +import "go.uber.org/atomic" + type future interface { wait() OK() bool @@ -29,11 +31,13 @@ type Future[T any] struct { ch chan struct{} value T err error + done *atomic.Bool } func newFuture[T any]() *Future[T] { return &Future[T]{ - ch: make(chan struct{}), + ch: make(chan struct{}), + done: atomic.NewBool(false), } } @@ -55,6 +59,11 @@ func (future *Future[T]) Value() T { return future.value } +// Done indicates if the fn has finished. +func (future *Future[T]) Done() bool { + return future.done.Load() +} + // False if error occurred, // true otherwise. func (future *Future[T]) OK() bool { @@ -86,6 +95,7 @@ func Go[T any](fn func() (T, error)) *Future[T] { go func() { future.value, future.err = fn() close(future.ch) + future.done.Store(true) }() return future } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d6055caac6..f8d24185ec 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2577,6 +2577,7 @@ type dataCoordConfig struct { ImportCheckIntervalHigh ParamItem `refreshable:"true"` ImportCheckIntervalLow ParamItem `refreshable:"true"` MaxFilesPerImportReq ParamItem `refreshable:"true"` + WaitForIndex ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` } @@ -3122,6 +3123,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.MaxFilesPerImportReq.Init(base.mgr) + p.WaitForIndex = ParamItem{ + Key: "dataCoord.import.waitForIndex", + Version: "2.4.0", + Doc: "Indicates whether the import operation waits for the completion of index building.", + DefaultValue: "true", + PanicIfEmpty: false, + Export: true, + } + p.WaitForIndex.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataCoord.gracefulStopTimeout", Version: "2.3.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 87b3166330..9aad84b749 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -384,6 +384,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second)) assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second)) assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt()) + assert.Equal(t, true, Params.WaitForIndex.GetAsBool()) params.Save("datacoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))