diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0bd08c6200..4ed1660602 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -444,6 +444,7 @@ dataCoord: filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task. taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state. maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request. + waitForIndex: true # Indicates whether the import operation waits for the completion of index building. enableGarbageCollection: true gc: diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 5b33593fa9..a1c86cc560 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -39,13 +39,12 @@ type ImportChecker interface { } type importChecker struct { - meta *meta - broker broker.Broker - cluster Cluster - alloc allocator - sm Manager - imeta ImportMeta - buildIndexCh chan UniqueID + meta *meta + broker broker.Broker + cluster Cluster + alloc allocator + sm Manager + imeta ImportMeta closeOnce sync.Once closeChan chan struct{} @@ -57,17 +56,15 @@ func NewImportChecker(meta *meta, alloc allocator, sm Manager, imeta ImportMeta, - buildIndexCh chan UniqueID, ) ImportChecker { return &importChecker{ - meta: meta, - broker: broker, - cluster: cluster, - alloc: alloc, - sm: sm, - imeta: imeta, - buildIndexCh: buildIndexCh, - closeChan: make(chan struct{}), + meta: meta, + broker: broker, + cluster: cluster, + alloc: alloc, + sm: sm, + imeta: imeta, + closeChan: make(chan struct{}), } } @@ -241,6 +238,8 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { } func (c *importChecker) checkImportingJob(job ImportJob) { + log := log.With(zap.Int64("jobID", job.GetJobID()), + zap.Int64("collectionID", job.GetCollectionID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { @@ -248,40 +247,35 @@ func (c *importChecker) checkImportingJob(job ImportJob) { } } - unfinished := make([]int64, 0) - for _, task := range tasks { - segmentIDs := task.(*importTask).GetSegmentIDs() - for _, segmentID := range segmentIDs { - segment := c.meta.GetSegment(segmentID) - if segment == nil { - log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...) - continue - } - if segment.GetIsImporting() { - unfinished = append(unfinished, segmentID) - } - } - } + segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - err := c.sm.FlushImportSegments(ctx, job.GetCollectionID(), unfinished) - if err != nil { - log.Warn("flush imported segments failed", zap.Int64("jobID", job.GetJobID()), - zap.Int64("collectionID", job.GetCollectionID()), zap.Int64s("segments", unfinished), zap.Error(err)) + // Verify completion of index building for imported segments. + unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs) + if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 { + log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) return } + unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { + segment := c.meta.GetSegment(segmentID) + if segment == nil { + log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID)) + return false + } + return segment.GetIsImporting() + }) + channels, err := c.meta.GetSegmentsChannels(unfinished) if err != nil { - log.Warn("get segments channels failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("get segments channels failed", zap.Error(err)) return } for _, segmentID := range unfinished { - c.buildIndexCh <- segmentID // accelerate index building channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { - log.Warn("nil channel checkpoint", zap.Int64("jobID", job.GetJobID())) + log.Warn("nil channel checkpoint") return } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) @@ -289,7 +283,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) { op3 := UpdateIsImporting(segmentID, false) err = c.meta.UpdateSegmentsInfo(op1, op2, op3) if err != nil { - log.Warn("update import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("update import segment failed", zap.Error(err)) return } } @@ -297,8 +291,10 @@ func (c *importChecker) checkImportingJob(job ImportJob) { completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { - log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + log.Warn("failed to update job state to Completed", zap.Error(err)) + return } + log.Info("import job completed") } func (c *importChecker) tryFailingTasks(job ImportJob) { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 0451cfe68a..8c90918a3a 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -65,9 +65,8 @@ func (s *ImportCheckerSuite) SetupTest() { broker := broker2.NewMockBroker(s.T()) sm := NewMockManager(s.T()) - buildIndexCh := make(chan UniqueID, 1024) - checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta, buildIndexCh).(*importChecker) + checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta).(*importChecker) s.checker = checker job := &importJob{ @@ -178,8 +177,6 @@ func (s *ImportCheckerSuite) TestCheckJob() { s.Equal(true, segment.GetIsImporting()) } } - sm := s.checker.sm.(*MockManager) - sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index f26ded5f48..68086666fd 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -47,6 +48,8 @@ type importScheduler struct { alloc allocator imeta ImportMeta + buildIndexCh chan UniqueID + closeOnce sync.Once closeChan chan struct{} } @@ -55,13 +58,15 @@ func NewImportScheduler(meta *meta, cluster Cluster, alloc allocator, imeta ImportMeta, + buildIndexCh chan UniqueID, ) ImportScheduler { return &importScheduler{ - meta: meta, - cluster: cluster, - alloc: alloc, - imeta: imeta, - closeChan: make(chan struct{}), + meta: meta, + cluster: cluster, + alloc: alloc, + imeta: imeta, + buildIndexCh: buildIndexCh, + closeChan: make(chan struct{}), } } @@ -159,7 +164,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 { }(nodeID) } wg.Wait() - log.Info("peek slots done", zap.Any("nodeSlots", nodeSlots)) + log.Debug("peek slots done", zap.Any("nodeSlots", nodeSlots)) return nodeSlots } @@ -295,12 +300,17 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) return } - op := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) - err = s.meta.UpdateSegmentsInfo(op) + op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) + op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) + err = s.meta.UpdateSegmentsInfo(op1, op2) if err != nil { log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...) return } + select { + case s.buildIndexCh <- info.GetSegmentID(): // accelerate index building: + default: + } } completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 20356d5a37..0f9acf578f 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -67,7 +67,8 @@ func (s *ImportSchedulerSuite) SetupTest() { }) s.imeta, err = NewImportMeta(s.catalog) s.NoError(err) - s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler) + buildIndexCh := make(chan UniqueID, 1024) + s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta, buildIndexCh).(*importScheduler) } func (s *ImportSchedulerSuite) TestProcessPreImport() { diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 6193a5fe2c..d94350768d 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "fmt" "path" "sort" "time" @@ -349,7 +350,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i if totalSegment != 0 { completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment) } - return importingProgress*0.8 + completedProgress*0.2, importedRows, totalRows + return importingProgress*0.5 + completedProgress*0.5, importedRows, totalRows } func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) { @@ -361,11 +362,11 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalp case internalpb.ImportJobState_PreImporting: progress := getPreImportingProgress(jobID, imeta) - return 10 + int64(progress*40), internalpb.ImportJobState_Importing, 0, 0, "" + return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, "" case internalpb.ImportJobState_Importing: progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta) - return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, importedRows, totalRows, "" + return 10 + 30 + int64(progress*60), internalpb.ImportJobState_Importing, importedRows, totalRows, "" case internalpb.ImportJobState_Completed: totalRows := int64(0) @@ -428,9 +429,13 @@ func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error { } func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) { - if len(importFile.GetPaths()) < 1 { + if len(importFile.GetPaths()) == 0 { return nil, merr.WrapErrImportFailed("no insert binlogs to import") } + if len(importFile.GetPaths()) > 2 { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+ + "Valid paths length should be one or two, but got paths:%s", importFile.GetPaths())) + } insertPrefix := importFile.GetPaths()[0] segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false) diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 52af580393..a5c7f266c3 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -322,38 +322,53 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { deltaPrefix = "mock-delta-binlog-prefix" ) - segmentInsertPaths := []string{ - // segment 435978159261483008 - "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008", - // segment 435978159261483009 - "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009", - } - - segmentDeltaPaths := []string{ - "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008", - "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009", - } - - ctx := context.Background() - cm := mocks2.NewChunkManager(t) - cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) - cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) - - file := &internalpb.ImportFile{ - Id: 1, - Paths: []string{insertPrefix, deltaPrefix}, - } - - files, err := ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.NoError(t, err) - assert.Equal(t, 2, len(files)) - for _, f := range files { - assert.Equal(t, 2, len(f.GetPaths())) - for _, p := range f.GetPaths() { - segmentID := path.Base(p) - assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") + t.Run("normal case", func(t *testing.T) { + segmentInsertPaths := []string{ + // segment 435978159261483008 + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008", + // segment 435978159261483009 + "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009", } - } + + segmentDeltaPaths := []string{ + "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008", + "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009", + } + + cm := mocks2.NewChunkManager(t) + cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) + + file := &internalpb.ImportFile{ + Id: 1, + Paths: []string{insertPrefix, deltaPrefix}, + } + + files, err := ListBinlogsAndGroupBySegment(context.Background(), cm, file) + assert.NoError(t, err) + assert.Equal(t, 2, len(files)) + for _, f := range files { + assert.Equal(t, 2, len(f.GetPaths())) + for _, p := range f.GetPaths() { + segmentID := path.Base(p) + assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") + } + } + }) + + t.Run("invalid input", func(t *testing.T) { + file := &internalpb.ImportFile{ + Paths: []string{}, + } + _, err := ListBinlogsAndGroupBySegment(context.Background(), nil, file) + assert.Error(t, err) + t.Logf("%s", err) + + file.Paths = []string{insertPrefix, deltaPrefix, "dummy_prefix"} + _, err = ListBinlogsAndGroupBySegment(context.Background(), nil, file) + assert.Error(t, err) + t.Logf("%s", err) + }) } func TestImportUtil_GetImportProgress(t *testing.T) { @@ -517,7 +532,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(10+40), progress) + assert.Equal(t, int64(10+30), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -525,7 +540,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(10+40+40*0.5), progress) + assert.Equal(t, int64(10+30+30*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -547,7 +562,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) assert.NoError(t, err) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress) + assert.Equal(t, int64(float32(10+30+30+30*2/6)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index c36701f9b1..6519ec5106 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -25,6 +25,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -885,3 +886,22 @@ func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []U return ret } + +func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64) []int64 { + indexes := m.GetIndexesForCollection(collectionID, "") + if len(indexes) == 0 { + // doesn't have index + return nil + } + indexed := make([]int64, 0, len(segmentIDs)) + segIndexStates := m.getSegmentsIndexStates(collectionID, segmentIDs) + for segmentID, states := range segIndexStates { + indexStates := lo.Filter(lo.Values(states), func(state *indexpb.SegmentIndexState, _ int) bool { + return state.GetState() == commonpb.IndexState_Finished + }) + if len(indexStates) == len(indexes) { + indexed = append(indexed, segmentID) + } + } + return lo.Without(segmentIDs, indexed...) +} diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 022add231d..f6018c54de 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1328,3 +1328,19 @@ func TestRemoveSegmentIndex(t *testing.T) { assert.Equal(t, len(m.buildID2SegmentIndex), 0) }) } + +func TestIndexMeta_GetUnindexedSegments(t *testing.T) { + m := createMetaTable(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}) + + // normal case + segmentIDs := make([]int64, 0, 11) + for i := 0; i <= 10; i++ { + segmentIDs = append(segmentIDs, segID+int64(i)) + } + unindexed := m.indexMeta.GetUnindexedSegments(collID, segmentIDs) + assert.Equal(t, 8, len(unindexed)) + + // no index + unindexed = m.indexMeta.GetUnindexedSegments(collID+1, segmentIDs) + assert.Equal(t, 0, len(unindexed)) +} diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 1673e9fefd..5106c8a489 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -375,7 +375,7 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c PartitionID: partitionID, InsertChannel: channelName, NumOfRows: 0, - State: commonpb.SegmentState_Flushed, + State: commonpb.SegmentState_Importing, MaxRowNum: 0, Level: datapb.SegmentLevel_L1, LastExpireTime: math.MaxUint64, diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 6ee0eb1f1b..85dadc4520 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -384,8 +384,8 @@ func (s *Server) initDataCoord() error { if err != nil { return err } - s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) - s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta, s.buildIndexCh) + s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh) + s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta) s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index dce783af52..505ed3ef6b 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1656,17 +1656,14 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter var timeoutTs uint64 = math.MaxUint64 timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) if err == nil { + // Specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". dur, err := time.ParseDuration(timeoutStr) if err != nil { resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err))) return resp, nil } - ts, err := s.allocator.allocTimestamp(ctx) - if err != nil { - resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc ts failed, err=%w", err))) - return resp, nil - } - timeoutTs = tsoutil.AddPhysicalDurationOnTs(ts, dur) + curTs := tsoutil.GetCurrentTime() + timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) } files := in.GetFiles() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 02570074d9..50b75f0ddb 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -94,7 +94,8 @@ type DataNode struct { syncMgr syncmgr.SyncManager writeBufferManager writebuffer.BufferManager - importManager *importv2.Manager + importTaskMgr importv2.TaskManager + importScheduler importv2.Scheduler clearSignal chan string // vchannel name segmentCache *Cache @@ -290,8 +291,8 @@ func (node *DataNode) Init() error { node.writeBufferManager = writebuffer.NewManager(syncMgr) - node.importManager = importv2.NewManager(node.syncMgr, node.chunkManager) - + node.importTaskMgr = importv2.NewTaskManager() + node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) log.Info("init datanode done", zap.String("Address", node.address)) @@ -386,7 +387,7 @@ func (node *DataNode) Start() error { go node.compactionExecutor.start(node.ctx) - go node.importManager.Start() + go node.importScheduler.Start() if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID, @@ -459,8 +460,8 @@ func (node *DataNode) Stop() error { node.channelCheckpointUpdater.close() } - if node.importManager != nil { - node.importManager.Close() + if node.importScheduler != nil { + node.importScheduler.Close() } node.cancel() diff --git a/internal/datanode/importv2/manager.go b/internal/datanode/importv2/manager.go deleted file mode 100644 index d76fe110cf..0000000000 --- a/internal/datanode/importv2/manager.go +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importv2 - -import ( - "github.com/milvus-io/milvus/internal/datanode/syncmgr" - "github.com/milvus-io/milvus/internal/storage" -) - -type Manager struct { - TaskManager - Executor -} - -func NewManager(syncMgr syncmgr.SyncManager, cm storage.ChunkManager) *Manager { - tm := NewTaskManager() - e := NewExecutor(tm, syncMgr, cm) - return &Manager{ - TaskManager: tm, - Executor: e, - } -} diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/scheduler.go similarity index 69% rename from internal/datanode/importv2/executor.go rename to internal/datanode/importv2/scheduler.go index 999ffbdfb4..d681344888 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/scheduler.go @@ -33,17 +33,16 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/paramtable" ) -type Executor interface { +type Scheduler interface { Start() Slots() int64 Close() } -type executor struct { +type scheduler struct { manager TaskManager syncMgr syncmgr.SyncManager cm storage.ChunkManager @@ -54,13 +53,12 @@ type executor struct { closeChan chan struct{} } -func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Executor { +func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler { pool := conc.NewPool[any]( - hardware.GetCPUNum()*2, - conc.WithPreAlloc(false), - conc.WithDisablePurge(false), + paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(), + conc.WithPreAlloc(true), ) - return &executor{ + return &scheduler{ manager: manager, syncMgr: syncMgr, cm: cm, @@ -69,8 +67,8 @@ func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.Ch } } -func (e *executor) Start() { - log.Info("start import executor") +func (s *scheduler) Start() { + log.Info("start import scheduler") var ( exeTicker = time.NewTicker(1 * time.Second) logTicker = time.NewTicker(10 * time.Minute) @@ -79,39 +77,46 @@ func (e *executor) Start() { defer logTicker.Stop() for { select { - case <-e.closeChan: - log.Info("import executor exited") + case <-s.closeChan: + log.Info("import scheduler exited") return case <-exeTicker.C: - tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) - wg := &sync.WaitGroup{} + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + futures := make(map[int64][]*conc.Future[any]) for _, task := range tasks { - wg.Add(1) - go func(task Task) { - defer wg.Done() - switch task.GetType() { - case PreImportTaskType: - e.PreImport(task) - case ImportTaskType: - e.Import(task) - } - }(task) + switch task.GetType() { + case PreImportTaskType: + fs := s.PreImport(task) + futures[task.GetTaskID()] = fs + tryFreeFutures(futures) + case ImportTaskType: + fs := s.Import(task) + futures[task.GetTaskID()] = fs + tryFreeFutures(futures) + } + } + for taskID, fs := range futures { + err := conc.AwaitAll(fs...) + if err != nil { + return + } + s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed)) + log.Info("preimport/import done", zap.Int64("taskID", taskID)) } - wg.Wait() case <-logTicker.C: - LogStats(e.manager) + LogStats(s.manager) } } } -func (e *executor) Slots() int64 { - tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) +func (s *scheduler) Slots() int64 { + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks)) } -func (e *executor) Close() { - e.closeOnce.Do(func() { - close(e.closeChan) +func (s *scheduler) Close() { + s.closeOnce.Do(func() { + close(s.closeChan) }) } @@ -126,33 +131,46 @@ func WrapLogFields(task Task, fields ...zap.Field) []zap.Field { return res } -func (e *executor) handleErr(task Task, err error, msg string) { - log.Warn(msg, WrapLogFields(task, zap.Error(err))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) +func tryFreeFutures(futures map[int64][]*conc.Future[any]) { + for k, fs := range futures { + fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool { + if f.Done() { + _, err := f.Await() + return err != nil + } + return true + }) + futures[k] = fs + } } -func (e *executor) PreImport(task Task) { +func (s *scheduler) handleErr(task Task, err error, msg string) { + log.Warn(msg, WrapLogFields(task, zap.Error(err))...) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) +} + +func (s *scheduler) PreImport(task Task) []*conc.Future[any] { bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() log.Info("start to preimport", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) files := lo.Map(task.(*PreImportTask).GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { return fileStat.GetImportFile() }) fn := func(i int, file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) + reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) if err != nil { - e.handleErr(task, err, "new reader failed") + s.handleErr(task, err, "new reader failed") return err } defer reader.Close() start := time.Now() - err = e.readFileStat(reader, task, i) + err = s.readFileStat(reader, task, i) if err != nil { - e.handleErr(task, err, "preimport failed") + s.handleErr(task, err, "preimport failed") return err } log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), @@ -164,23 +182,16 @@ func (e *executor) PreImport(task Task) { for i, file := range files { i := i file := file - f := e.pool.Submit(func() (any, error) { + f := s.pool.Submit(func() (any, error) { err := fn(i, file) return err, err }) futures = append(futures, f) } - err := conc.AwaitAll(futures...) - if err != nil { - return - } - - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) - log.Info("executor preimport done", - WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...) + return futures } -func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { +func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { fileSize, err := reader.Size() if err != nil { return err @@ -225,30 +236,30 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i TotalMemorySize: int64(totalSize), HashedStats: hashedStats, } - e.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) + s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) return nil } -func (e *executor) Import(task Task) { +func (s *scheduler) Import(task Task) []*conc.Future[any] { bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() log.Info("start to import", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) req := task.(*ImportTask).req fn := func(file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) + reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) if err != nil { - e.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) + s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) return err } defer reader.Close() start := time.Now() - err = e.importFile(reader, task) + err = s.importFile(reader, task) if err != nil { - e.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) + s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) return err } log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), @@ -259,24 +270,18 @@ func (e *executor) Import(task Task) { futures := make([]*conc.Future[any], 0, len(req.GetFiles())) for _, file := range req.GetFiles() { file := file - f := e.pool.Submit(func() (any, error) { + f := s.pool.Submit(func() (any, error) { err := fn(file) return err, err }) futures = append(futures, f) } - err := conc.AwaitAll(futures...) - if err != nil { - return - } - - e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) - log.Info("import done", WrapLogFields(task)...) + return futures } -func (e *executor) importFile(reader importutilv2.Reader, task Task) error { +func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error { iTask := task.(*ImportTask) - futures := make([]*conc.Future[error], 0) + syncFutures := make([]*conc.Future[error], 0) syncTasks := make([]syncmgr.Task, 0) for { data, err := reader.Read() @@ -294,14 +299,14 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error { if err != nil { return err } - fs, sts, err := e.Sync(iTask, hashedData) + fs, sts, err := s.Sync(iTask, hashedData) if err != nil { return err } - futures = append(futures, fs...) + syncFutures = append(syncFutures, fs...) syncTasks = append(syncTasks, sts...) } - err := conc.AwaitAll(futures...) + err := conc.AwaitAll(syncFutures...) if err != nil { return err } @@ -310,13 +315,13 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error { if err != nil { return err } - e.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) + s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) } return nil } -func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { +func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { log.Info("start to sync import data", WrapLogFields(task)...) futures := make([]*conc.Future[error], 0) syncTasks := make([]syncmgr.Task, 0) @@ -335,7 +340,7 @@ func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future return nil, nil, err } segmentImportedSizes[segmentID] += size - future := e.syncMgr.SyncData(task.GetCtx(), syncTask) + future := s.syncMgr.SyncData(task.GetCtx(), syncTask) futures = append(futures, future) syncTasks = append(syncTasks, syncTask) } diff --git a/internal/datanode/importv2/executor_test.go b/internal/datanode/importv2/scheduler_test.go similarity index 92% rename from internal/datanode/importv2/executor_test.go rename to internal/datanode/importv2/scheduler_test.go index d4739d28a2..10c93fe559 100644 --- a/internal/datanode/importv2/executor_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -65,24 +65,24 @@ type mockReader struct { io.Seeker } -type ExecutorSuite struct { +type SchedulerSuite struct { suite.Suite numRows int schema *schemapb.CollectionSchema - cm storage.ChunkManager - reader *importutilv2.MockReader - syncMgr *syncmgr.MockSyncManager - manager TaskManager - executor *executor + cm storage.ChunkManager + reader *importutilv2.MockReader + syncMgr *syncmgr.MockSyncManager + manager TaskManager + scheduler *scheduler } -func (s *ExecutorSuite) SetupSuite() { +func (s *SchedulerSuite) SetupSuite() { paramtable.Init() } -func (s *ExecutorSuite) SetupTest() { +func (s *SchedulerSuite) SetupTest() { s.numRows = 100 s.schema = &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ @@ -116,7 +116,7 @@ func (s *ExecutorSuite) SetupTest() { s.manager = NewTaskManager() s.syncMgr = syncmgr.NewMockSyncManager(s.T()) - s.executor = NewExecutor(s.manager, s.syncMgr, nil).(*executor) + s.scheduler = NewScheduler(s.manager, s.syncMgr, nil).(*scheduler) } func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { @@ -226,7 +226,7 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount return insertData } -func (s *ExecutorSuite) TestExecutor_Slots() { +func (s *SchedulerSuite) TestScheduler_Slots() { preimportReq := &datapb.PreImportRequest{ JobID: 1, TaskID: 2, @@ -239,11 +239,11 @@ func (s *ExecutorSuite) TestExecutor_Slots() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - slots := s.executor.Slots() + slots := s.scheduler.Slots() s.Equal(paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()-1, slots) } -func (s *ExecutorSuite) TestExecutor_Start_Preimport() { +func (s *SchedulerSuite) TestScheduler_Start_Preimport() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -262,7 +262,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -276,14 +276,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { +func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -316,7 +316,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -330,14 +330,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Import() { +func (s *SchedulerSuite) TestScheduler_Start_Import() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -355,7 +355,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { @@ -391,14 +391,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() { importTask := NewImportTask(importReq) s.manager.Add(importTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { +func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { content := &sampleContent{ Rows: make([]sampleRow, 0), } @@ -416,7 +416,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.executor.cm = cm + s.scheduler.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { @@ -452,14 +452,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { importTask := NewImportTask(importReq) s.manager.Add(importTask) - go s.executor.Start() - defer s.executor.Close() + go s.scheduler.Start() + defer s.scheduler.Close() s.Eventually(func() bool { return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed }, 10*time.Second, 100*time.Millisecond) } -func (s *ExecutorSuite) TestExecutor_ReadFileStat() { +func (s *SchedulerSuite) TestScheduler_ReadFileStat() { importFile := &internalpb.ImportFile{ Paths: []string{"dummy.json"}, } @@ -489,11 +489,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - err := s.executor.readFileStat(s.reader, preimportTask, 0) + err := s.scheduler.readFileStat(s.reader, preimportTask, 0) s.NoError(err) } -func (s *ExecutorSuite) TestExecutor_ImportFile() { +func (s *SchedulerSuite) TestScheduler_ImportFile() { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { future := conc.Go(func() (error, error) { return nil, nil @@ -540,10 +540,10 @@ func (s *ExecutorSuite) TestExecutor_ImportFile() { } importTask := NewImportTask(importReq) s.manager.Add(importTask) - err := s.executor.importFile(s.reader, importTask) + err := s.scheduler.importFile(s.reader, importTask) s.NoError(err) } -func TestExecutor(t *testing.T) { - suite.Run(t, new(ExecutorSuite)) +func TestScheduler(t *testing.T) { + suite.Run(t, new(SchedulerSuite)) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5b880e60ff..13868c946a 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -408,7 +408,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques } task := importv2.NewPreImportTask(req) - node.importManager.Add(task) + node.importTaskMgr.Add(task) log.Info("datanode added preimport task") return merr.Success(), nil @@ -427,7 +427,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( return merr.Status(err), nil } task := importv2.NewImportTask(req) - node.importManager.Add(task) + node.importTaskMgr.Add(task) log.Info("datanode added import task") return merr.Success(), nil @@ -441,7 +441,7 @@ func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreIm return &datapb.QueryPreImportResponse{Status: merr.Status(err)}, nil } status := merr.Success() - task := node.importManager.Get(req.GetTaskID()) + task := node.importTaskMgr.Get(req.GetTaskID()) if task == nil || task.GetType() != importv2.PreImportTaskType { status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.PreImportTaskType)) } @@ -470,12 +470,12 @@ func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRe if req.GetQuerySlot() { return &datapb.QueryImportResponse{ Status: status, - Slots: node.importManager.Slots(), + Slots: node.importScheduler.Slots(), }, nil } // query import - task := node.importManager.Get(req.GetTaskID()) + task := node.importTaskMgr.Get(req.GetTaskID()) if task == nil || task.GetType() != importv2.ImportTaskType { status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.ImportTaskType)) } @@ -498,7 +498,7 @@ func (node *DataNode) DropImport(ctx context.Context, req *datapb.DropImportRequ return merr.Status(err), nil } - node.importManager.Remove(req.GetTaskID()) + node.importTaskMgr.Remove(req.GetTaskID()) log.Info("datanode drop import done") diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 69cc763601..d85cf790d3 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -70,9 +70,13 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { if tsStart != 0 || tsEnd != math.MaxUint64 { r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd)) } - if len(paths) < 1 { + if len(paths) == 0 { return merr.WrapErrImportFailed("no insert binlogs to import") } + if len(paths) > 2 { + return merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+ + "Valid paths length should be one or two, but got paths:%s", paths)) + } insertLogs, err := listInsertLogs(r.ctx, r.cm, paths[0]) if err != nil { return err diff --git a/pkg/util/conc/future.go b/pkg/util/conc/future.go index 94c974317e..fb76a22165 100644 --- a/pkg/util/conc/future.go +++ b/pkg/util/conc/future.go @@ -16,6 +16,8 @@ package conc +import "go.uber.org/atomic" + type future interface { wait() OK() bool @@ -29,11 +31,13 @@ type Future[T any] struct { ch chan struct{} value T err error + done *atomic.Bool } func newFuture[T any]() *Future[T] { return &Future[T]{ - ch: make(chan struct{}), + ch: make(chan struct{}), + done: atomic.NewBool(false), } } @@ -55,6 +59,11 @@ func (future *Future[T]) Value() T { return future.value } +// Done indicates if the fn has finished. +func (future *Future[T]) Done() bool { + return future.done.Load() +} + // False if error occurred, // true otherwise. func (future *Future[T]) OK() bool { @@ -86,6 +95,7 @@ func Go[T any](fn func() (T, error)) *Future[T] { go func() { future.value, future.err = fn() close(future.ch) + future.done.Store(true) }() return future } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d6055caac6..f8d24185ec 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2577,6 +2577,7 @@ type dataCoordConfig struct { ImportCheckIntervalHigh ParamItem `refreshable:"true"` ImportCheckIntervalLow ParamItem `refreshable:"true"` MaxFilesPerImportReq ParamItem `refreshable:"true"` + WaitForIndex ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` } @@ -3122,6 +3123,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.MaxFilesPerImportReq.Init(base.mgr) + p.WaitForIndex = ParamItem{ + Key: "dataCoord.import.waitForIndex", + Version: "2.4.0", + Doc: "Indicates whether the import operation waits for the completion of index building.", + DefaultValue: "true", + PanicIfEmpty: false, + Export: true, + } + p.WaitForIndex.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataCoord.gracefulStopTimeout", Version: "2.3.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 87b3166330..9aad84b749 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -384,6 +384,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second)) assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second)) assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt()) + assert.Equal(t, true, Params.WaitForIndex.GetAsBool()) params.Save("datacoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))