enhance: Ensure ImportV2 waits for the index to be built and refine some logic (#31629)

Feature Introduced:
1. Ensure ImportV2 waits for the index to be built

Enhancements Introduced:
1. Utilization of local time for timeout ts instead of allocating ts
from rootcoord.
3. Enhanced input file length check for binlog import.
4. Removal of duplicated manager in datanode.
5. Renaming of executor to scheduler in datanode.
6. Utilization of a thread pool in the scheduler in datanode.

issue: https://github.com/milvus-io/milvus/issues/28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-04-01 20:09:13 +08:00 committed by GitHub
parent 39337e09b8
commit 4e264003bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 311 additions and 257 deletions

View File

@ -444,6 +444,7 @@ dataCoord:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task. filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state. taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request. maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
waitForIndex: true # Indicates whether the import operation waits for the completion of index building.
enableGarbageCollection: true enableGarbageCollection: true
gc: gc:

View File

@ -39,13 +39,12 @@ type ImportChecker interface {
} }
type importChecker struct { type importChecker struct {
meta *meta meta *meta
broker broker.Broker broker broker.Broker
cluster Cluster cluster Cluster
alloc allocator alloc allocator
sm Manager sm Manager
imeta ImportMeta imeta ImportMeta
buildIndexCh chan UniqueID
closeOnce sync.Once closeOnce sync.Once
closeChan chan struct{} closeChan chan struct{}
@ -57,17 +56,15 @@ func NewImportChecker(meta *meta,
alloc allocator, alloc allocator,
sm Manager, sm Manager,
imeta ImportMeta, imeta ImportMeta,
buildIndexCh chan UniqueID,
) ImportChecker { ) ImportChecker {
return &importChecker{ return &importChecker{
meta: meta, meta: meta,
broker: broker, broker: broker,
cluster: cluster, cluster: cluster,
alloc: alloc, alloc: alloc,
sm: sm, sm: sm,
imeta: imeta, imeta: imeta,
buildIndexCh: buildIndexCh, closeChan: make(chan struct{}),
closeChan: make(chan struct{}),
} }
} }
@ -241,6 +238,8 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
} }
func (c *importChecker) checkImportingJob(job ImportJob) { func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()),
zap.Int64("collectionID", job.GetCollectionID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, t := range tasks { for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed { if t.GetState() != datapb.ImportTaskStateV2_Completed {
@ -248,40 +247,35 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
} }
} }
unfinished := make([]int64, 0) segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
for _, task := range tasks { return t.(*importTask).GetSegmentIDs()
segmentIDs := task.(*importTask).GetSegmentIDs() })
for _, segmentID := range segmentIDs {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...)
continue
}
if segment.GetIsImporting() {
unfinished = append(unfinished, segmentID)
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Verify completion of index building for imported segments.
defer cancel() unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs)
err := c.sm.FlushImportSegments(ctx, job.GetCollectionID(), unfinished) if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 {
if err != nil { log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
log.Warn("flush imported segments failed", zap.Int64("jobID", job.GetJobID()),
zap.Int64("collectionID", job.GetCollectionID()), zap.Int64s("segments", unfinished), zap.Error(err))
return return
} }
unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID))
return false
}
return segment.GetIsImporting()
})
channels, err := c.meta.GetSegmentsChannels(unfinished) channels, err := c.meta.GetSegmentsChannels(unfinished)
if err != nil { if err != nil {
log.Warn("get segments channels failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) log.Warn("get segments channels failed", zap.Error(err))
return return
} }
for _, segmentID := range unfinished { for _, segmentID := range unfinished {
c.buildIndexCh <- segmentID // accelerate index building
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil { if channelCP == nil {
log.Warn("nil channel checkpoint", zap.Int64("jobID", job.GetJobID())) log.Warn("nil channel checkpoint")
return return
} }
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
@ -289,7 +283,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
op3 := UpdateIsImporting(segmentID, false) op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(op1, op2, op3) err = c.meta.UpdateSegmentsInfo(op1, op2, op3)
if err != nil { if err != nil {
log.Warn("update import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) log.Warn("update import segment failed", zap.Error(err))
return return
} }
} }
@ -297,8 +291,10 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil { if err != nil {
log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) log.Warn("failed to update job state to Completed", zap.Error(err))
return
} }
log.Info("import job completed")
} }
func (c *importChecker) tryFailingTasks(job ImportJob) { func (c *importChecker) tryFailingTasks(job ImportJob) {

View File

@ -65,9 +65,8 @@ func (s *ImportCheckerSuite) SetupTest() {
broker := broker2.NewMockBroker(s.T()) broker := broker2.NewMockBroker(s.T())
sm := NewMockManager(s.T()) sm := NewMockManager(s.T())
buildIndexCh := make(chan UniqueID, 1024)
checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta, buildIndexCh).(*importChecker) checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta).(*importChecker)
s.checker = checker s.checker = checker
job := &importJob{ job := &importJob{
@ -178,8 +177,6 @@ func (s *ImportCheckerSuite) TestCheckJob() {
s.Equal(true, segment.GetIsImporting()) s.Equal(true, segment.GetIsImporting())
} }
} }
sm := s.checker.sm.(*MockManager)
sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil)

View File

@ -25,6 +25,7 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
@ -47,6 +48,8 @@ type importScheduler struct {
alloc allocator alloc allocator
imeta ImportMeta imeta ImportMeta
buildIndexCh chan UniqueID
closeOnce sync.Once closeOnce sync.Once
closeChan chan struct{} closeChan chan struct{}
} }
@ -55,13 +58,15 @@ func NewImportScheduler(meta *meta,
cluster Cluster, cluster Cluster,
alloc allocator, alloc allocator,
imeta ImportMeta, imeta ImportMeta,
buildIndexCh chan UniqueID,
) ImportScheduler { ) ImportScheduler {
return &importScheduler{ return &importScheduler{
meta: meta, meta: meta,
cluster: cluster, cluster: cluster,
alloc: alloc, alloc: alloc,
imeta: imeta, imeta: imeta,
closeChan: make(chan struct{}), buildIndexCh: buildIndexCh,
closeChan: make(chan struct{}),
} }
} }
@ -159,7 +164,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
}(nodeID) }(nodeID)
} }
wg.Wait() wg.Wait()
log.Info("peek slots done", zap.Any("nodeSlots", nodeSlots)) log.Debug("peek slots done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots return nodeSlots
} }
@ -295,12 +300,17 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
return return
} }
op := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil)
err = s.meta.UpdateSegmentsInfo(op) op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(op1, op2)
if err != nil { if err != nil {
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...) log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
return return
} }
select {
case s.buildIndexCh <- info.GetSegmentID(): // accelerate index building:
default:
}
} }
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))

View File

@ -67,7 +67,8 @@ func (s *ImportSchedulerSuite) SetupTest() {
}) })
s.imeta, err = NewImportMeta(s.catalog) s.imeta, err = NewImportMeta(s.catalog)
s.NoError(err) s.NoError(err)
s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler) buildIndexCh := make(chan UniqueID, 1024)
s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta, buildIndexCh).(*importScheduler)
} }
func (s *ImportSchedulerSuite) TestProcessPreImport() { func (s *ImportSchedulerSuite) TestProcessPreImport() {

View File

@ -18,6 +18,7 @@ package datacoord
import ( import (
"context" "context"
"fmt"
"path" "path"
"sort" "sort"
"time" "time"
@ -349,7 +350,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i
if totalSegment != 0 { if totalSegment != 0 {
completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment) completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment)
} }
return importingProgress*0.8 + completedProgress*0.2, importedRows, totalRows return importingProgress*0.5 + completedProgress*0.5, importedRows, totalRows
} }
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) { func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) {
@ -361,11 +362,11 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalp
case internalpb.ImportJobState_PreImporting: case internalpb.ImportJobState_PreImporting:
progress := getPreImportingProgress(jobID, imeta) progress := getPreImportingProgress(jobID, imeta)
return 10 + int64(progress*40), internalpb.ImportJobState_Importing, 0, 0, "" return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, ""
case internalpb.ImportJobState_Importing: case internalpb.ImportJobState_Importing:
progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta) progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta)
return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, importedRows, totalRows, "" return 10 + 30 + int64(progress*60), internalpb.ImportJobState_Importing, importedRows, totalRows, ""
case internalpb.ImportJobState_Completed: case internalpb.ImportJobState_Completed:
totalRows := int64(0) totalRows := int64(0)
@ -428,9 +429,13 @@ func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error {
} }
func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) { func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) {
if len(importFile.GetPaths()) < 1 { if len(importFile.GetPaths()) == 0 {
return nil, merr.WrapErrImportFailed("no insert binlogs to import") return nil, merr.WrapErrImportFailed("no insert binlogs to import")
} }
if len(importFile.GetPaths()) > 2 {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+
"Valid paths length should be one or two, but got paths:%s", importFile.GetPaths()))
}
insertPrefix := importFile.GetPaths()[0] insertPrefix := importFile.GetPaths()[0]
segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false) segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false)

View File

@ -322,38 +322,53 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) {
deltaPrefix = "mock-delta-binlog-prefix" deltaPrefix = "mock-delta-binlog-prefix"
) )
segmentInsertPaths := []string{ t.Run("normal case", func(t *testing.T) {
// segment 435978159261483008 segmentInsertPaths := []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008", // segment 435978159261483008
// segment 435978159261483009 "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009", // segment 435978159261483009
} "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009",
segmentDeltaPaths := []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008",
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009",
}
ctx := context.Background()
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)
file := &internalpb.ImportFile{
Id: 1,
Paths: []string{insertPrefix, deltaPrefix},
}
files, err := ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.NoError(t, err)
assert.Equal(t, 2, len(files))
for _, f := range files {
assert.Equal(t, 2, len(f.GetPaths()))
for _, p := range f.GetPaths() {
segmentID := path.Base(p)
assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009")
} }
}
segmentDeltaPaths := []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008",
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009",
}
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)
file := &internalpb.ImportFile{
Id: 1,
Paths: []string{insertPrefix, deltaPrefix},
}
files, err := ListBinlogsAndGroupBySegment(context.Background(), cm, file)
assert.NoError(t, err)
assert.Equal(t, 2, len(files))
for _, f := range files {
assert.Equal(t, 2, len(f.GetPaths()))
for _, p := range f.GetPaths() {
segmentID := path.Base(p)
assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009")
}
}
})
t.Run("invalid input", func(t *testing.T) {
file := &internalpb.ImportFile{
Paths: []string{},
}
_, err := ListBinlogsAndGroupBySegment(context.Background(), nil, file)
assert.Error(t, err)
t.Logf("%s", err)
file.Paths = []string{insertPrefix, deltaPrefix, "dummy_prefix"}
_, err = ListBinlogsAndGroupBySegment(context.Background(), nil, file)
assert.Error(t, err)
t.Logf("%s", err)
})
} }
func TestImportUtil_GetImportProgress(t *testing.T) { func TestImportUtil_GetImportProgress(t *testing.T) {
@ -517,7 +532,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
assert.NoError(t, err) assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40), progress) assert.Equal(t, int64(10+30), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason) assert.Equal(t, "", reason)
@ -525,7 +540,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
assert.NoError(t, err) assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40+40*0.5), progress) assert.Equal(t, int64(10+30+30*0.5), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason) assert.Equal(t, "", reason)
@ -547,7 +562,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100))
assert.NoError(t, err) assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress) assert.Equal(t, int64(float32(10+30+30+30*2/6)), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason) assert.Equal(t, "", reason)

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -885,3 +886,22 @@ func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []U
return ret return ret
} }
func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64) []int64 {
indexes := m.GetIndexesForCollection(collectionID, "")
if len(indexes) == 0 {
// doesn't have index
return nil
}
indexed := make([]int64, 0, len(segmentIDs))
segIndexStates := m.getSegmentsIndexStates(collectionID, segmentIDs)
for segmentID, states := range segIndexStates {
indexStates := lo.Filter(lo.Values(states), func(state *indexpb.SegmentIndexState, _ int) bool {
return state.GetState() == commonpb.IndexState_Finished
})
if len(indexStates) == len(indexes) {
indexed = append(indexed, segmentID)
}
}
return lo.Without(segmentIDs, indexed...)
}

View File

@ -1328,3 +1328,19 @@ func TestRemoveSegmentIndex(t *testing.T) {
assert.Equal(t, len(m.buildID2SegmentIndex), 0) assert.Equal(t, len(m.buildID2SegmentIndex), 0)
}) })
} }
func TestIndexMeta_GetUnindexedSegments(t *testing.T) {
m := createMetaTable(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)})
// normal case
segmentIDs := make([]int64, 0, 11)
for i := 0; i <= 10; i++ {
segmentIDs = append(segmentIDs, segID+int64(i))
}
unindexed := m.indexMeta.GetUnindexedSegments(collID, segmentIDs)
assert.Equal(t, 8, len(unindexed))
// no index
unindexed = m.indexMeta.GetUnindexedSegments(collID+1, segmentIDs)
assert.Equal(t, 0, len(unindexed))
}

View File

@ -375,7 +375,7 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c
PartitionID: partitionID, PartitionID: partitionID,
InsertChannel: channelName, InsertChannel: channelName,
NumOfRows: 0, NumOfRows: 0,
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Importing,
MaxRowNum: 0, MaxRowNum: 0,
Level: datapb.SegmentLevel_L1, Level: datapb.SegmentLevel_L1,
LastExpireTime: math.MaxUint64, LastExpireTime: math.MaxUint64,

View File

@ -384,8 +384,8 @@ func (s *Server) initDataCoord() error {
if err != nil { if err != nil {
return err return err
} }
s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh)
s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta, s.buildIndexCh) s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta)
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

View File

@ -1656,17 +1656,14 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
var timeoutTs uint64 = math.MaxUint64 var timeoutTs uint64 = math.MaxUint64
timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions())
if err == nil { if err == nil {
// Specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m".
dur, err := time.ParseDuration(timeoutStr) dur, err := time.ParseDuration(timeoutStr)
if err != nil { if err != nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err))) resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err)))
return resp, nil return resp, nil
} }
ts, err := s.allocator.allocTimestamp(ctx) curTs := tsoutil.GetCurrentTime()
if err != nil { timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur)
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc ts failed, err=%w", err)))
return resp, nil
}
timeoutTs = tsoutil.AddPhysicalDurationOnTs(ts, dur)
} }
files := in.GetFiles() files := in.GetFiles()

View File

@ -94,7 +94,8 @@ type DataNode struct {
syncMgr syncmgr.SyncManager syncMgr syncmgr.SyncManager
writeBufferManager writebuffer.BufferManager writeBufferManager writebuffer.BufferManager
importManager *importv2.Manager importTaskMgr importv2.TaskManager
importScheduler importv2.Scheduler
clearSignal chan string // vchannel name clearSignal chan string // vchannel name
segmentCache *Cache segmentCache *Cache
@ -290,8 +291,8 @@ func (node *DataNode) Init() error {
node.writeBufferManager = writebuffer.NewManager(syncMgr) node.writeBufferManager = writebuffer.NewManager(syncMgr)
node.importManager = importv2.NewManager(node.syncMgr, node.chunkManager) node.importTaskMgr = importv2.NewTaskManager()
node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager)
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)
log.Info("init datanode done", zap.String("Address", node.address)) log.Info("init datanode done", zap.String("Address", node.address))
@ -386,7 +387,7 @@ func (node *DataNode) Start() error {
go node.compactionExecutor.start(node.ctx) go node.compactionExecutor.start(node.ctx)
go node.importManager.Start() go node.importScheduler.Start()
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID, node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID,
@ -459,8 +460,8 @@ func (node *DataNode) Stop() error {
node.channelCheckpointUpdater.close() node.channelCheckpointUpdater.close()
} }
if node.importManager != nil { if node.importScheduler != nil {
node.importManager.Close() node.importScheduler.Close()
} }
node.cancel() node.cancel()

View File

@ -1,36 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
)
type Manager struct {
TaskManager
Executor
}
func NewManager(syncMgr syncmgr.SyncManager, cm storage.ChunkManager) *Manager {
tm := NewTaskManager()
e := NewExecutor(tm, syncMgr, cm)
return &Manager{
TaskManager: tm,
Executor: e,
}
}

View File

@ -33,17 +33,16 @@ import (
"github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
) )
type Executor interface { type Scheduler interface {
Start() Start()
Slots() int64 Slots() int64
Close() Close()
} }
type executor struct { type scheduler struct {
manager TaskManager manager TaskManager
syncMgr syncmgr.SyncManager syncMgr syncmgr.SyncManager
cm storage.ChunkManager cm storage.ChunkManager
@ -54,13 +53,12 @@ type executor struct {
closeChan chan struct{} closeChan chan struct{}
} }
func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Executor { func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler {
pool := conc.NewPool[any]( pool := conc.NewPool[any](
hardware.GetCPUNum()*2, paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(),
conc.WithPreAlloc(false), conc.WithPreAlloc(true),
conc.WithDisablePurge(false),
) )
return &executor{ return &scheduler{
manager: manager, manager: manager,
syncMgr: syncMgr, syncMgr: syncMgr,
cm: cm, cm: cm,
@ -69,8 +67,8 @@ func NewExecutor(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.Ch
} }
} }
func (e *executor) Start() { func (s *scheduler) Start() {
log.Info("start import executor") log.Info("start import scheduler")
var ( var (
exeTicker = time.NewTicker(1 * time.Second) exeTicker = time.NewTicker(1 * time.Second)
logTicker = time.NewTicker(10 * time.Minute) logTicker = time.NewTicker(10 * time.Minute)
@ -79,39 +77,46 @@ func (e *executor) Start() {
defer logTicker.Stop() defer logTicker.Stop()
for { for {
select { select {
case <-e.closeChan: case <-s.closeChan:
log.Info("import executor exited") log.Info("import scheduler exited")
return return
case <-exeTicker.C: case <-exeTicker.C:
tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
wg := &sync.WaitGroup{} futures := make(map[int64][]*conc.Future[any])
for _, task := range tasks { for _, task := range tasks {
wg.Add(1) switch task.GetType() {
go func(task Task) { case PreImportTaskType:
defer wg.Done() fs := s.PreImport(task)
switch task.GetType() { futures[task.GetTaskID()] = fs
case PreImportTaskType: tryFreeFutures(futures)
e.PreImport(task) case ImportTaskType:
case ImportTaskType: fs := s.Import(task)
e.Import(task) futures[task.GetTaskID()] = fs
} tryFreeFutures(futures)
}(task) }
}
for taskID, fs := range futures {
err := conc.AwaitAll(fs...)
if err != nil {
return
}
s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed))
log.Info("preimport/import done", zap.Int64("taskID", taskID))
} }
wg.Wait()
case <-logTicker.C: case <-logTicker.C:
LogStats(e.manager) LogStats(s.manager)
} }
} }
} }
func (e *executor) Slots() int64 { func (s *scheduler) Slots() int64 {
tasks := e.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))
return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks)) return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks))
} }
func (e *executor) Close() { func (s *scheduler) Close() {
e.closeOnce.Do(func() { s.closeOnce.Do(func() {
close(e.closeChan) close(s.closeChan)
}) })
} }
@ -126,33 +131,46 @@ func WrapLogFields(task Task, fields ...zap.Field) []zap.Field {
return res return res
} }
func (e *executor) handleErr(task Task, err error, msg string) { func tryFreeFutures(futures map[int64][]*conc.Future[any]) {
log.Warn(msg, WrapLogFields(task, zap.Error(err))...) for k, fs := range futures {
e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool {
if f.Done() {
_, err := f.Await()
return err != nil
}
return true
})
futures[k] = fs
}
} }
func (e *executor) PreImport(task Task) { func (s *scheduler) handleErr(task Task, err error, msg string) {
log.Warn(msg, WrapLogFields(task, zap.Error(err))...)
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
}
func (s *scheduler) PreImport(task Task) []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt()
log.Info("start to preimport", WrapLogFields(task, log.Info("start to preimport", WrapLogFields(task,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Any("schema", task.GetSchema()))...) zap.Any("schema", task.GetSchema()))...)
e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
files := lo.Map(task.(*PreImportTask).GetFileStats(), files := lo.Map(task.(*PreImportTask).GetFileStats(),
func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
return fileStat.GetImportFile() return fileStat.GetImportFile()
}) })
fn := func(i int, file *internalpb.ImportFile) error { fn := func(i int, file *internalpb.ImportFile) error {
reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
if err != nil { if err != nil {
e.handleErr(task, err, "new reader failed") s.handleErr(task, err, "new reader failed")
return err return err
} }
defer reader.Close() defer reader.Close()
start := time.Now() start := time.Now()
err = e.readFileStat(reader, task, i) err = s.readFileStat(reader, task, i)
if err != nil { if err != nil {
e.handleErr(task, err, "preimport failed") s.handleErr(task, err, "preimport failed")
return err return err
} }
log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
@ -164,23 +182,16 @@ func (e *executor) PreImport(task Task) {
for i, file := range files { for i, file := range files {
i := i i := i
file := file file := file
f := e.pool.Submit(func() (any, error) { f := s.pool.Submit(func() (any, error) {
err := fn(i, file) err := fn(i, file)
return err, err return err, err
}) })
futures = append(futures, f) futures = append(futures, f)
} }
err := conc.AwaitAll(futures...) return futures
if err != nil {
return
}
e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
log.Info("executor preimport done",
WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...)
} }
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
fileSize, err := reader.Size() fileSize, err := reader.Size()
if err != nil { if err != nil {
return err return err
@ -225,30 +236,30 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i
TotalMemorySize: int64(totalSize), TotalMemorySize: int64(totalSize),
HashedStats: hashedStats, HashedStats: hashedStats,
} }
e.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat))
return nil return nil
} }
func (e *executor) Import(task Task) { func (s *scheduler) Import(task Task) []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt()
log.Info("start to import", WrapLogFields(task, log.Info("start to import", WrapLogFields(task,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Any("schema", task.GetSchema()))...) zap.Any("schema", task.GetSchema()))...)
e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
req := task.(*ImportTask).req req := task.(*ImportTask).req
fn := func(file *internalpb.ImportFile) error { fn := func(file *internalpb.ImportFile) error {
reader, err := importutilv2.NewReader(task.GetCtx(), e.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
if err != nil { if err != nil {
e.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String()))
return err return err
} }
defer reader.Close() defer reader.Close()
start := time.Now() start := time.Now()
err = e.importFile(reader, task) err = s.importFile(reader, task)
if err != nil { if err != nil {
e.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String()))
return err return err
} }
log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
@ -259,24 +270,18 @@ func (e *executor) Import(task Task) {
futures := make([]*conc.Future[any], 0, len(req.GetFiles())) futures := make([]*conc.Future[any], 0, len(req.GetFiles()))
for _, file := range req.GetFiles() { for _, file := range req.GetFiles() {
file := file file := file
f := e.pool.Submit(func() (any, error) { f := s.pool.Submit(func() (any, error) {
err := fn(file) err := fn(file)
return err, err return err, err
}) })
futures = append(futures, f) futures = append(futures, f)
} }
err := conc.AwaitAll(futures...) return futures
if err != nil {
return
}
e.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
log.Info("import done", WrapLogFields(task)...)
} }
func (e *executor) importFile(reader importutilv2.Reader, task Task) error { func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error {
iTask := task.(*ImportTask) iTask := task.(*ImportTask)
futures := make([]*conc.Future[error], 0) syncFutures := make([]*conc.Future[error], 0)
syncTasks := make([]syncmgr.Task, 0) syncTasks := make([]syncmgr.Task, 0)
for { for {
data, err := reader.Read() data, err := reader.Read()
@ -294,14 +299,14 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error {
if err != nil { if err != nil {
return err return err
} }
fs, sts, err := e.Sync(iTask, hashedData) fs, sts, err := s.Sync(iTask, hashedData)
if err != nil { if err != nil {
return err return err
} }
futures = append(futures, fs...) syncFutures = append(syncFutures, fs...)
syncTasks = append(syncTasks, sts...) syncTasks = append(syncTasks, sts...)
} }
err := conc.AwaitAll(futures...) err := conc.AwaitAll(syncFutures...)
if err != nil { if err != nil {
return err return err
} }
@ -310,13 +315,13 @@ func (e *executor) importFile(reader importutilv2.Reader, task Task) error {
if err != nil { if err != nil {
return err return err
} }
e.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
} }
return nil return nil
} }
func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) { func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) {
log.Info("start to sync import data", WrapLogFields(task)...) log.Info("start to sync import data", WrapLogFields(task)...)
futures := make([]*conc.Future[error], 0) futures := make([]*conc.Future[error], 0)
syncTasks := make([]syncmgr.Task, 0) syncTasks := make([]syncmgr.Task, 0)
@ -335,7 +340,7 @@ func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future
return nil, nil, err return nil, nil, err
} }
segmentImportedSizes[segmentID] += size segmentImportedSizes[segmentID] += size
future := e.syncMgr.SyncData(task.GetCtx(), syncTask) future := s.syncMgr.SyncData(task.GetCtx(), syncTask)
futures = append(futures, future) futures = append(futures, future)
syncTasks = append(syncTasks, syncTask) syncTasks = append(syncTasks, syncTask)
} }

View File

@ -65,24 +65,24 @@ type mockReader struct {
io.Seeker io.Seeker
} }
type ExecutorSuite struct { type SchedulerSuite struct {
suite.Suite suite.Suite
numRows int numRows int
schema *schemapb.CollectionSchema schema *schemapb.CollectionSchema
cm storage.ChunkManager cm storage.ChunkManager
reader *importutilv2.MockReader reader *importutilv2.MockReader
syncMgr *syncmgr.MockSyncManager syncMgr *syncmgr.MockSyncManager
manager TaskManager manager TaskManager
executor *executor scheduler *scheduler
} }
func (s *ExecutorSuite) SetupSuite() { func (s *SchedulerSuite) SetupSuite() {
paramtable.Init() paramtable.Init()
} }
func (s *ExecutorSuite) SetupTest() { func (s *SchedulerSuite) SetupTest() {
s.numRows = 100 s.numRows = 100
s.schema = &schemapb.CollectionSchema{ s.schema = &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
@ -116,7 +116,7 @@ func (s *ExecutorSuite) SetupTest() {
s.manager = NewTaskManager() s.manager = NewTaskManager()
s.syncMgr = syncmgr.NewMockSyncManager(s.T()) s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.executor = NewExecutor(s.manager, s.syncMgr, nil).(*executor) s.scheduler = NewScheduler(s.manager, s.syncMgr, nil).(*scheduler)
} }
func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData {
@ -226,7 +226,7 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
return insertData return insertData
} }
func (s *ExecutorSuite) TestExecutor_Slots() { func (s *SchedulerSuite) TestScheduler_Slots() {
preimportReq := &datapb.PreImportRequest{ preimportReq := &datapb.PreImportRequest{
JobID: 1, JobID: 1,
TaskID: 2, TaskID: 2,
@ -239,11 +239,11 @@ func (s *ExecutorSuite) TestExecutor_Slots() {
preimportTask := NewPreImportTask(preimportReq) preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask) s.manager.Add(preimportTask)
slots := s.executor.Slots() slots := s.scheduler.Slots()
s.Equal(paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()-1, slots) s.Equal(paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()-1, slots)
} }
func (s *ExecutorSuite) TestExecutor_Start_Preimport() { func (s *SchedulerSuite) TestScheduler_Start_Preimport() {
content := &sampleContent{ content := &sampleContent{
Rows: make([]sampleRow, 0), Rows: make([]sampleRow, 0),
} }
@ -262,7 +262,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() {
ioReader := strings.NewReader(string(bytes)) ioReader := strings.NewReader(string(bytes))
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.executor.cm = cm s.scheduler.cm = cm
preimportReq := &datapb.PreImportRequest{ preimportReq := &datapb.PreImportRequest{
JobID: 1, JobID: 1,
@ -276,14 +276,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() {
preimportTask := NewPreImportTask(preimportReq) preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask) s.manager.Add(preimportTask)
go s.executor.Start() go s.scheduler.Start()
defer s.executor.Close() defer s.scheduler.Close()
s.Eventually(func() bool { s.Eventually(func() bool {
return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed
}, 10*time.Second, 100*time.Millisecond) }, 10*time.Second, 100*time.Millisecond)
} }
func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() {
content := &sampleContent{ content := &sampleContent{
Rows: make([]sampleRow, 0), Rows: make([]sampleRow, 0),
} }
@ -316,7 +316,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() {
ioReader := strings.NewReader(string(bytes)) ioReader := strings.NewReader(string(bytes))
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.executor.cm = cm s.scheduler.cm = cm
preimportReq := &datapb.PreImportRequest{ preimportReq := &datapb.PreImportRequest{
JobID: 1, JobID: 1,
@ -330,14 +330,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() {
preimportTask := NewPreImportTask(preimportReq) preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask) s.manager.Add(preimportTask)
go s.executor.Start() go s.scheduler.Start()
defer s.executor.Close() defer s.scheduler.Close()
s.Eventually(func() bool { s.Eventually(func() bool {
return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed return s.manager.Get(preimportTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed
}, 10*time.Second, 100*time.Millisecond) }, 10*time.Second, 100*time.Millisecond)
} }
func (s *ExecutorSuite) TestExecutor_Start_Import() { func (s *SchedulerSuite) TestScheduler_Start_Import() {
content := &sampleContent{ content := &sampleContent{
Rows: make([]sampleRow, 0), Rows: make([]sampleRow, 0),
} }
@ -355,7 +355,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() {
cm := mocks.NewChunkManager(s.T()) cm := mocks.NewChunkManager(s.T())
ioReader := strings.NewReader(string(bytes)) ioReader := strings.NewReader(string(bytes))
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.executor.cm = cm s.scheduler.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
future := conc.Go(func() (error, error) { future := conc.Go(func() (error, error) {
@ -391,14 +391,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import() {
importTask := NewImportTask(importReq) importTask := NewImportTask(importReq)
s.manager.Add(importTask) s.manager.Add(importTask)
go s.executor.Start() go s.scheduler.Start()
defer s.executor.Close() defer s.scheduler.Close()
s.Eventually(func() bool { s.Eventually(func() bool {
return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Completed
}, 10*time.Second, 100*time.Millisecond) }, 10*time.Second, 100*time.Millisecond)
} }
func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() { func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
content := &sampleContent{ content := &sampleContent{
Rows: make([]sampleRow, 0), Rows: make([]sampleRow, 0),
} }
@ -416,7 +416,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() {
cm := mocks.NewChunkManager(s.T()) cm := mocks.NewChunkManager(s.T())
ioReader := strings.NewReader(string(bytes)) ioReader := strings.NewReader(string(bytes))
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.executor.cm = cm s.scheduler.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
future := conc.Go(func() (error, error) { future := conc.Go(func() (error, error) {
@ -452,14 +452,14 @@ func (s *ExecutorSuite) TestExecutor_Start_Import_Failed() {
importTask := NewImportTask(importReq) importTask := NewImportTask(importReq)
s.manager.Add(importTask) s.manager.Add(importTask)
go s.executor.Start() go s.scheduler.Start()
defer s.executor.Close() defer s.scheduler.Close()
s.Eventually(func() bool { s.Eventually(func() bool {
return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed return s.manager.Get(importTask.GetTaskID()).GetState() == datapb.ImportTaskStateV2_Failed
}, 10*time.Second, 100*time.Millisecond) }, 10*time.Second, 100*time.Millisecond)
} }
func (s *ExecutorSuite) TestExecutor_ReadFileStat() { func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
importFile := &internalpb.ImportFile{ importFile := &internalpb.ImportFile{
Paths: []string{"dummy.json"}, Paths: []string{"dummy.json"},
} }
@ -489,11 +489,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
} }
preimportTask := NewPreImportTask(preimportReq) preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask) s.manager.Add(preimportTask)
err := s.executor.readFileStat(s.reader, preimportTask, 0) err := s.scheduler.readFileStat(s.reader, preimportTask, 0)
s.NoError(err) s.NoError(err)
} }
func (s *ExecutorSuite) TestExecutor_ImportFile() { func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
future := conc.Go(func() (error, error) { future := conc.Go(func() (error, error) {
return nil, nil return nil, nil
@ -540,10 +540,10 @@ func (s *ExecutorSuite) TestExecutor_ImportFile() {
} }
importTask := NewImportTask(importReq) importTask := NewImportTask(importReq)
s.manager.Add(importTask) s.manager.Add(importTask)
err := s.executor.importFile(s.reader, importTask) err := s.scheduler.importFile(s.reader, importTask)
s.NoError(err) s.NoError(err)
} }
func TestExecutor(t *testing.T) { func TestScheduler(t *testing.T) {
suite.Run(t, new(ExecutorSuite)) suite.Run(t, new(SchedulerSuite))
} }

View File

@ -408,7 +408,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques
} }
task := importv2.NewPreImportTask(req) task := importv2.NewPreImportTask(req)
node.importManager.Add(task) node.importTaskMgr.Add(task)
log.Info("datanode added preimport task") log.Info("datanode added preimport task")
return merr.Success(), nil return merr.Success(), nil
@ -427,7 +427,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) (
return merr.Status(err), nil return merr.Status(err), nil
} }
task := importv2.NewImportTask(req) task := importv2.NewImportTask(req)
node.importManager.Add(task) node.importTaskMgr.Add(task)
log.Info("datanode added import task") log.Info("datanode added import task")
return merr.Success(), nil return merr.Success(), nil
@ -441,7 +441,7 @@ func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreIm
return &datapb.QueryPreImportResponse{Status: merr.Status(err)}, nil return &datapb.QueryPreImportResponse{Status: merr.Status(err)}, nil
} }
status := merr.Success() status := merr.Success()
task := node.importManager.Get(req.GetTaskID()) task := node.importTaskMgr.Get(req.GetTaskID())
if task == nil || task.GetType() != importv2.PreImportTaskType { if task == nil || task.GetType() != importv2.PreImportTaskType {
status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.PreImportTaskType)) status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.PreImportTaskType))
} }
@ -470,12 +470,12 @@ func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRe
if req.GetQuerySlot() { if req.GetQuerySlot() {
return &datapb.QueryImportResponse{ return &datapb.QueryImportResponse{
Status: status, Status: status,
Slots: node.importManager.Slots(), Slots: node.importScheduler.Slots(),
}, nil }, nil
} }
// query import // query import
task := node.importManager.Get(req.GetTaskID()) task := node.importTaskMgr.Get(req.GetTaskID())
if task == nil || task.GetType() != importv2.ImportTaskType { if task == nil || task.GetType() != importv2.ImportTaskType {
status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.ImportTaskType)) status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.ImportTaskType))
} }
@ -498,7 +498,7 @@ func (node *DataNode) DropImport(ctx context.Context, req *datapb.DropImportRequ
return merr.Status(err), nil return merr.Status(err), nil
} }
node.importManager.Remove(req.GetTaskID()) node.importTaskMgr.Remove(req.GetTaskID())
log.Info("datanode drop import done") log.Info("datanode drop import done")

View File

@ -70,9 +70,13 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error {
if tsStart != 0 || tsEnd != math.MaxUint64 { if tsStart != 0 || tsEnd != math.MaxUint64 {
r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd)) r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd))
} }
if len(paths) < 1 { if len(paths) == 0 {
return merr.WrapErrImportFailed("no insert binlogs to import") return merr.WrapErrImportFailed("no insert binlogs to import")
} }
if len(paths) > 2 {
return merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+
"Valid paths length should be one or two, but got paths:%s", paths))
}
insertLogs, err := listInsertLogs(r.ctx, r.cm, paths[0]) insertLogs, err := listInsertLogs(r.ctx, r.cm, paths[0])
if err != nil { if err != nil {
return err return err

View File

@ -16,6 +16,8 @@
package conc package conc
import "go.uber.org/atomic"
type future interface { type future interface {
wait() wait()
OK() bool OK() bool
@ -29,11 +31,13 @@ type Future[T any] struct {
ch chan struct{} ch chan struct{}
value T value T
err error err error
done *atomic.Bool
} }
func newFuture[T any]() *Future[T] { func newFuture[T any]() *Future[T] {
return &Future[T]{ return &Future[T]{
ch: make(chan struct{}), ch: make(chan struct{}),
done: atomic.NewBool(false),
} }
} }
@ -55,6 +59,11 @@ func (future *Future[T]) Value() T {
return future.value return future.value
} }
// Done indicates if the fn has finished.
func (future *Future[T]) Done() bool {
return future.done.Load()
}
// False if error occurred, // False if error occurred,
// true otherwise. // true otherwise.
func (future *Future[T]) OK() bool { func (future *Future[T]) OK() bool {
@ -86,6 +95,7 @@ func Go[T any](fn func() (T, error)) *Future[T] {
go func() { go func() {
future.value, future.err = fn() future.value, future.err = fn()
close(future.ch) close(future.ch)
future.done.Store(true)
}() }()
return future return future
} }

View File

@ -2577,6 +2577,7 @@ type dataCoordConfig struct {
ImportCheckIntervalHigh ParamItem `refreshable:"true"` ImportCheckIntervalHigh ParamItem `refreshable:"true"`
ImportCheckIntervalLow ParamItem `refreshable:"true"` ImportCheckIntervalLow ParamItem `refreshable:"true"`
MaxFilesPerImportReq ParamItem `refreshable:"true"` MaxFilesPerImportReq ParamItem `refreshable:"true"`
WaitForIndex ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"`
} }
@ -3122,6 +3123,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
} }
p.MaxFilesPerImportReq.Init(base.mgr) p.MaxFilesPerImportReq.Init(base.mgr)
p.WaitForIndex = ParamItem{
Key: "dataCoord.import.waitForIndex",
Version: "2.4.0",
Doc: "Indicates whether the import operation waits for the completion of index building.",
DefaultValue: "true",
PanicIfEmpty: false,
Export: true,
}
p.WaitForIndex.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{ p.GracefulStopTimeout = ParamItem{
Key: "dataCoord.gracefulStopTimeout", Key: "dataCoord.gracefulStopTimeout",
Version: "2.3.7", Version: "2.3.7",

View File

@ -384,6 +384,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second)) assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second)) assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt()) assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())
assert.Equal(t, true, Params.WaitForIndex.GetAsBool())
params.Save("datacoord.gracefulStopTimeout", "100") params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))