From 04e595d61bba10ba5193ce5fb67df570e7b9c7dc Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Wed, 11 Jan 2023 18:57:43 +0800 Subject: [PATCH] BulkInsert completes when segments are flushed, without checking indexes (#21604) /kind improvement Signed-off-by: Yuchen Gao --- internal/datacoord/segment_manager.go | 12 +- internal/rootcoord/broker.go | 5 + internal/rootcoord/import_helper.go | 12 + internal/rootcoord/import_manager.go | 192 +++++--------- internal/rootcoord/import_manager_test.go | 308 +++++++--------------- internal/rootcoord/root_coord.go | 42 ++- internal/rootcoord/root_coord_test.go | 33 +-- 7 files changed, 223 insertions(+), 381 deletions(-) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index e9932c5b38..2d75664baf 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -434,6 +434,10 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu ret = append(ret, id) continue } + if info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { + continue + } if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return nil, err } @@ -524,7 +528,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { continue } channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info) - if info.State == commonpb.SegmentState_Sealed { + if info.State == commonpb.SegmentState_Sealed || + info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { continue } // change shouldSeal to segment seal policy logic @@ -541,7 +547,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { for _, policy := range s.channelSealPolicies { vs := policy(channel, segmentInfos, ts) for _, info := range vs { - if info.State == commonpb.SegmentState_Sealed { + if info.State == commonpb.SegmentState_Sealed || + info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { continue } if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil { diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index a2f096e5d2..06f182904c 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -44,6 +44,7 @@ type Broker interface { Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) UnsetIsImportingState(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) MarkSegmentsDropped(context.Context, *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) + GetSegmentStates(context.Context, *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) @@ -203,6 +204,10 @@ func (b *ServerBroker) MarkSegmentsDropped(ctx context.Context, req *datapb.Mark return b.s.dataCoord.MarkSegmentsDropped(ctx, req) } +func (b *ServerBroker) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return b.s.dataCoord.GetSegmentStates(ctx, req) +} + func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{ CollectionID: collID, diff --git a/internal/rootcoord/import_helper.go b/internal/rootcoord/import_helper.go index 0f43b260c3..6349865793 100644 --- a/internal/rootcoord/import_helper.go +++ b/internal/rootcoord/import_helper.go @@ -15,6 +15,7 @@ type GetCollectionNameFunc func(collID, partitionID UniqueID) (string, string, e type IDAllocator func(count uint32) (UniqueID, UniqueID, error) type ImportFunc func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) type MarkSegmentsDroppedFunc func(ctx context.Context, segIDs []int64) (*commonpb.Status, error) +type GetSegmentStatesFunc func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) type DescribeIndexFunc func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) type GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) type UnsetIsImportingStateFunc func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) @@ -24,6 +25,7 @@ type ImportFactory interface { NewIDAllocator() IDAllocator NewImportFunc() ImportFunc NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc + NewGetSegmentStatesFunc() GetSegmentStatesFunc NewDescribeIndexFunc() DescribeIndexFunc NewGetSegmentIndexStateFunc() GetSegmentIndexStateFunc NewUnsetIsImportingStateFunc() UnsetIsImportingStateFunc @@ -49,6 +51,10 @@ func (f ImportFactoryImpl) NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc return MarkSegmentsDroppedWithCore(f.c) } +func (f ImportFactoryImpl) NewGetSegmentStatesFunc() GetSegmentStatesFunc { + return GetSegmentStatesWithCore(f.c) +} + func (f ImportFactoryImpl) NewDescribeIndexFunc() DescribeIndexFunc { return DescribeIndexWithCore(f.c) } @@ -102,6 +108,12 @@ func MarkSegmentsDroppedWithCore(c *Core) MarkSegmentsDroppedFunc { } } +func GetSegmentStatesWithCore(c *Core) GetSegmentStatesFunc { + return func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return c.broker.GetSegmentStates(ctx, req) + } +} + func DescribeIndexWithCore(c *Core) DescribeIndexFunc { return func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { return c.broker.DescribeIndex(ctx, colID) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 511bda0dd8..e8c05e342c 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -32,16 +32,15 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/typeutil" - + "github.com/samber/lo" "go.uber.org/zap" ) const ( - MaxPendingCount = 32 + MaxPendingCount = 65536 // TODO: Make this configurable. delimiter = "/" ) @@ -54,10 +53,11 @@ var checkPendingTasksInterval = 60 * 1000 // default 5*60*1000 milliseconds (5 minutes) var cleanUpLoopInterval = 5 * 60 * 1000 -// flipTaskStateInterval is the default interval to loop through tasks and check if their states needs to be -// flipped/updated, for example, from `ImportPersisted` to `ImportCompleted`. -// default 15 * 1000 milliseconds (15 seconds) -var flipTaskStateInterval = 15 * 1000 +// flipPersistedTaskInterval is the default interval to loop through tasks and check if their states needs to be +// flipped/updated from `ImportPersisted` to `ImportCompleted`. +// default 2 * 1000 milliseconds (2 seconds) +// TODO: Make this configurable. +var flipPersistedTaskInterval = 2 * 1000 // importManager manager for import tasks type importManager struct { @@ -79,8 +79,7 @@ type importManager struct { callImportService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error) callMarkSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) - callDescribeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) - callGetSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) + callGetSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) callUnsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) } @@ -89,9 +88,8 @@ func newImportManager(ctx context.Context, client kv.TxnKV, idAlloc func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error), importService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error), markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error), + getSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error), getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error), - describeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error), - getSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error), unsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)) *importManager { mgr := &importManager{ ctx: ctx, @@ -106,9 +104,8 @@ func newImportManager(ctx context.Context, client kv.TxnKV, idAllocator: idAlloc, callImportService: importService, callMarkSegmentsDropped: markSegmentsDropped, + callGetSegmentStates: getSegmentStates, getCollectionName: getCollectionName, - callDescribeIndex: describeIndex, - callGetSegmentIndexState: getSegmentIndexState, callUnsetIsImportingState: unsetIsImportingState, } return mgr @@ -149,17 +146,17 @@ func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) { // flipTaskStateLoop periodically calls `flipTaskState` to check if states of the tasks need to be updated. func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { defer wg.Done() - ticker := time.NewTicker(time.Duration(flipTaskStateInterval) * time.Millisecond) - defer ticker.Stop() + flipPersistedTicker := time.NewTicker(time.Duration(flipPersistedTaskInterval) * time.Millisecond) + defer flipPersistedTicker.Stop() for { select { case <-m.ctx.Done(): log.Info("import manager context done, exit check flipTaskStateLoop") return - case <-ticker.C: - log.Info("start trying to flip task state") - if err := m.flipTaskState(m.ctx); err != nil { - log.Error("failed to flip task state", zap.Error(err)) + case <-flipPersistedTicker.C: + log.Debug("start trying to flip ImportPersisted task") + if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { + log.Error("failed to flip ImportPersisted task", zap.Error(err)) } } } @@ -269,137 +266,105 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { return nil } -// flipTaskState checks every import task and flips their import state if eligible. -func (m *importManager) flipTaskState(ctx context.Context) error { +// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to +// `ImportCompleted` if eligible. +func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { var importTasks []*datapb.ImportTaskInfo var err error if importTasks, err = m.loadFromTaskStore(false); err != nil { log.Error("failed to load from task store", zap.Error(err)) return err } + for _, task := range importTasks { + // Checking if ImportPersisted --> ImportCompleted ready. if task.GetState().GetStateCode() == commonpb.ImportState_ImportPersisted { log.Info(" task found, checking if it is eligible to become ", zap.Int64("task ID", task.GetId())) - - // TODO: if collection or partition has been dropped before the task complete, - // we need to set the task to failed, because the checkIndexingDone() cannot know - // whether the collection has been dropped. + importTask := m.getTaskState(task.GetId()) // if this method failed, skip this task, try again in next round - m.flipTaskIndexState(ctx, task.GetId()) + if err = m.flipTaskFlushedState(ctx, importTask, task.GetDatanodeId()); err != nil { + log.Error("failed to flip task flushed state", + zap.Int64("task ID", task.GetId()), + zap.Error(err)) + } } } return nil } -func (m *importManager) flipTaskIndexState(ctx context.Context, taskID int64) error { - resp := m.getTaskState(taskID) - ok, err := m.checkIndexingDone(ctx, resp.GetCollectionId(), resp.GetSegmentIds()) +func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *milvuspb.GetImportStateResponse, dataNodeID int64) error { + ok, err := m.checkFlushDone(ctx, importTask.GetSegmentIds()) if err != nil { - log.Error("an error occurred while checking index state of segments", - zap.Int64("task ID", taskID), + log.Error("an error occurred while checking flush state of segments", + zap.Int64("task ID", importTask.GetId()), zap.Error(err)) - // Failed to check indexing state of segments return err } if ok { - if err := m.setImportTaskState(resp.GetId(), commonpb.ImportState_ImportCompleted); err != nil { - log.Error("failed to set import task state", - zap.Int64("task ID", resp.GetId()), - zap.Any("target state", commonpb.ImportState_ImportCompleted), - zap.Error(err)) - // Failed to update task's state - return err - } - log.Info("indexes are successfully built and the import task has complete!", - zap.Int64("task ID", resp.GetId())) - log.Info("now start unsetting isImporting state of segments", - zap.Int64("task ID", resp.GetId()), - zap.Int64s("segment IDs", resp.GetSegmentIds())) - // Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state. + // All segments are flushed. DataNode becomes available. + func() { + m.busyNodesLock.Lock() + defer m.busyNodesLock.Unlock() + delete(m.busyNodes, dataNodeID) + log.Info("a DataNode is no longer busy after processing task", + zap.Int64("dataNode ID", dataNodeID), + zap.Int64("task ID", importTask.GetId())) + + }() + // Unset isImporting flag. if m.callUnsetIsImportingState == nil { log.Error("callUnsetIsImportingState function of importManager is nil") return fmt.Errorf("failed to describe index: segment state method of import manager is nil") } - status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{ - SegmentIds: resp.GetSegmentIds(), + _, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{ + SegmentIds: importTask.GetSegmentIds(), }) + if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportCompleted); err != nil { + log.Error("failed to set import task state", + zap.Int64("task ID", importTask.GetId()), + zap.Any("target state", commonpb.ImportState_ImportCompleted), + zap.Error(err)) + return err + } if err != nil { log.Error("failed to unset importing state of all segments (could be partial failure)", zap.Error(err)) return err } - if status.GetErrorCode() != commonpb.ErrorCode_Success { - log.Error("failed to unset importing state of all segments (could be partial failure)", - zap.Error(errors.New(status.GetReason()))) - return errors.New(status.GetReason()) + // Start working on new bulk insert tasks. + if err = m.sendOutTasks(m.ctx); err != nil { + log.Error("fail to send out import task to DataNodes", + zap.Int64("task ID", importTask.GetId())) } } - return nil } -// checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`. -// It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise. -func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) { - if m.callDescribeIndex == nil { - log.Error("callDescribeIndex function of importManager is nil") - return false, fmt.Errorf("failed to describe index: describe index method of import manager is nil") - } - - // Check if collection has indexed fields. - var descIdxResp *indexpb.DescribeIndexResponse - var err error - if descIdxResp, err = m.callDescribeIndex(ctx, collID); err != nil { - log.Error("failed to describe index", - zap.Int64("collection ID", collID), - zap.Error(err)) +// checkFlushDone checks if flush is done on given segments. +func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (bool, error) { + resp, err := m.callGetSegmentStates(ctx, &datapb.GetSegmentStatesRequest{ + SegmentIDs: segIDs, + }) + if err != nil { + log.Error("failed to get import task segment states", + zap.Int64s("segment IDs", segIDs)) return false, err } - if descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success && - descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_IndexNotExist { - log.Error("failed to describe index", - zap.Int64("collection ID", collID), - zap.String("reason", descIdxResp.GetStatus().GetReason())) - return false, errors.New(descIdxResp.GetStatus().GetReason()) + getSegmentStates := func(segment *datapb.SegmentStateInfo, _ int) string { + return segment.GetState().String() } - log.Info("index info retrieved for collection", - zap.Int64("collection ID", collID), - zap.Any("index info", descIdxResp.GetIndexInfos())) - if descIdxResp.GetStatus().GetErrorCode() == commonpb.ErrorCode_IndexNotExist || - len(descIdxResp.GetIndexInfos()) == 0 { - log.Info("index doesn't exist for collection", - zap.Int64("collection ID", collID)) - return true, nil - } - indexedSegmentCount := len(allSegmentIDs) - for _, indexInfo := range descIdxResp.GetIndexInfos() { - states, err := m.callGetSegmentIndexState(ctx, collID, indexInfo.GetIndexName(), allSegmentIDs) - if err != nil { - log.Error("failed to get index state in checkIndexingDone", zap.Error(err)) - return false, err - } - - // Count the # of segments with finished index. - ct := 0 - for _, s := range states { - if s.State == commonpb.IndexState_Finished { - ct++ - } - } - - if ct < indexedSegmentCount { - indexedSegmentCount = ct + log.Debug("checking import segment states", + zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates))) + for _, states := range resp.GetStates() { + // Flushed segment could get compacted, so only returns false if there are still importing segments. + if states.GetState() == commonpb.SegmentState_Importing || + states.GetState() == commonpb.SegmentState_Sealed { + return false, nil } } - - log.Info("segment indexing state checked", - zap.Int64s("segments checked", allSegmentIDs), - zap.Int("# of segments with complete index", indexedSegmentCount), - zap.Int64("collection ID", collID), - ) - return len(allSegmentIDs) == indexedSegmentCount, nil + return true, nil } func (m *importManager) isRowbased(files []string) (bool, error) { @@ -635,17 +600,6 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im if err != nil { return nil, err } - - // if is ImportState_ImportPersisted, and index is FLAT, set the task to be complated immediately - // this method is called from importWrapper.reportPersisted() to rootCoord.ReportImport(), - // if flipTaskIndexState failed, the outer caller(importWrapper) will retry 3 times - if ir.GetState() == commonpb.ImportState_ImportPersisted { - err = m.flipTaskIndexState(m.ctx, updatedInfo.GetId()) - if err != nil { - return nil, err - } - } - return updatedInfo, nil } diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index 837d84437f..bd8eef25b5 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -30,7 +30,6 @@ import ( memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -68,13 +67,23 @@ func TestImportManager_NewImportManager(t *testing.T) { }, CreateTs: time.Now().Unix() - 100, } + ti3 := &datapb.ImportTaskInfo{ + Id: 300, + State: &datapb.ImportTaskState{ + StateCode: commonpb.ImportState_ImportCompleted, + }, + CreateTs: time.Now().Unix() - 100, + } taskInfo1, err := proto.Marshal(ti1) assert.NoError(t, err) taskInfo2, err := proto.Marshal(ti2) assert.NoError(t, err) + taskInfo3, err := proto.Marshal(ti3) + assert.NoError(t, err) mockKv.Save(BuildImportTaskKey(1), "value") mockKv.Save(BuildImportTaskKey(100), string(taskInfo1)) mockKv.Save(BuildImportTaskKey(200), string(taskInfo2)) + mockKv.Save(BuildImportTaskKey(300), string(taskInfo3)) mockCallImportServiceErr := false callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { @@ -96,13 +105,20 @@ func TestImportManager_NewImportManager(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } var wg sync.WaitGroup wg.Add(1) t.Run("working task expired", func(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) // there are 2 tasks read from store, one is pending, the other is persisted. @@ -134,7 +150,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) var wgLoop sync.WaitGroup @@ -153,7 +169,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) assert.Panics(t, func() { mgr.init(context.TODO()) @@ -170,7 +186,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) }) @@ -184,7 +200,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) func() { @@ -204,7 +220,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(ctx) var wgLoop sync.WaitGroup @@ -262,7 +278,7 @@ func TestImportManager_TestSetImportTaskState(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil) assert.NotNil(t, mgr) _, err := mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -355,9 +371,16 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) _, err = mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -406,13 +429,23 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, CreateTs: time.Now().Unix() - 100, } + ti3 := &datapb.ImportTaskInfo{ + Id: 300, + State: &datapb.ImportTaskState{ + StateCode: commonpb.ImportState_ImportCompleted, + Segments: []int64{204, 205, 206}, + }, + CreateTs: time.Now().Unix() - 100, + } taskInfo1, err := proto.Marshal(ti1) assert.NoError(t, err) taskInfo2, err := proto.Marshal(ti2) assert.NoError(t, err) - mockKv.Save(BuildImportTaskKey(1), "value") + taskInfo3, err := proto.Marshal(ti3) + assert.NoError(t, err) mockKv.Save(BuildImportTaskKey(100), string(taskInfo1)) mockKv.Save(BuildImportTaskKey(200), string(taskInfo2)) + mockKv.Save(BuildImportTaskKey(300), string(taskInfo3)) mockCallImportServiceErr := false callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { @@ -434,40 +467,21 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } - callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - IndexInfos: []*indexpb.IndexInfo{ - {}, - }, - }, nil - } - callGetSegmentIndexState := func(ctx context.Context, collID UniqueID, indexName string, - segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { - return []*indexpb.SegmentIndexState{ - { - SegmentID: 200, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 201, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 202, - State: commonpb.IndexState_Finished, - }, }, nil } + callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil } - flipTaskStateInterval = 50 + flipPersistedTaskInterval = 20 var wg sync.WaitGroup wg.Add(1) t.Run("normal case", func(t *testing.T) { @@ -475,13 +489,13 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) mgr.flipTaskStateLoop(&wgLoop) wgLoop.Wait() - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) }) wg.Add(1) @@ -489,15 +503,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -511,15 +518,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -548,9 +548,15 @@ func TestImportManager_ImportJob(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } - + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // nil request - mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), nil, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -579,7 +585,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, task count equal to file count // since the importServiceFunc return error, tasks will be kept in pending list rowReq.Files = []string{"f1.json"} - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -592,7 +598,7 @@ func TestImportManager_ImportJob(t *testing.T) { // column-based case, one quest one task // since the importServiceFunc return error, tasks will be kept in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 1, len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -606,13 +612,13 @@ func TestImportManager_ImportJob(t *testing.T) { } // row-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks)) // column-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -636,7 +642,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, since the importServiceFunc return success for 1 task // the first task is sent to working list, and 1 task left in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -710,9 +716,16 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // each data node owns one task - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) for i := 0; i < len(dnList); i++ { resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -721,7 +734,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { } // all data nodes are busy, new task waiting in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) @@ -729,7 +742,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { // now all data nodes are free again, new task is executed instantly count = 0 - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -784,9 +797,16 @@ func TestImportManager_TaskState(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // add 3 tasks, their ID is 10000, 10001, 10002, make sure updateTaskInfo() works correctly - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) mgr.importJob(context.TODO(), rowReq, colID, 0) rowReq.Files = []string{"f2.json"} mgr.importJob(context.TODO(), rowReq, colID, 0) @@ -816,51 +836,6 @@ func TestImportManager_TaskState(t *testing.T) { }, } - // callDescribeIndex method is nil - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } - - // describe index failed, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } - // index doesn't exist, but callUnsetIsImportingState is nil, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, nil - } - // index doesn't exist, but failed to unset importing state, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, errors.New("error to unset importing state") - } - // index doesn't exist, but failed to unset importing state, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -874,7 +849,7 @@ func TestImportManager_TaskState(t *testing.T) { assert.Equal(t, int64(100), ti.GetCollectionId()) assert.Equal(t, int64(0), ti.GetPartitionId()) assert.Equal(t, []string{"f2.json"}, ti.GetFiles()) - assert.Equal(t, commonpb.ImportState_ImportCompleted, ti.GetState().GetStateCode()) + assert.Equal(t, commonpb.ImportState_ImportPersisted, ti.GetState().GetStateCode()) assert.Equal(t, int64(1000), ti.GetState().GetRowCount()) resp := mgr.getTaskState(10000) @@ -882,7 +857,7 @@ func TestImportManager_TaskState(t *testing.T) { resp = mgr.getTaskState(2) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.Equal(t, commonpb.ImportState_ImportCompleted, resp.State) + assert.Equal(t, commonpb.ImportState_ImportPersisted, resp.State) resp = mgr.getTaskState(1) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -938,7 +913,14 @@ func TestImportManager_AllocFail(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -971,6 +953,13 @@ func TestImportManager_ListAllTasks(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } colID1 := int64(100) colID2 := int64(101) @@ -999,7 +988,7 @@ func TestImportManager_ListAllTasks(t *testing.T) { } mockKv := memkv.NewMemoryKV() - mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, getCollectionName, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, callGetSegmentStates, getCollectionName, nil) // add 10 tasks for collection1, id from 1 to 10 file1 := "f1.json" @@ -1184,100 +1173,3 @@ func TestImportManager_isRowbased(t *testing.T) { assert.Nil(t, err) assert.False(t, rb) } - -func TestImportManager_checkIndexingDone(t *testing.T) { - ctx := context.Background() - - mgr := &importManager{ - callDescribeIndex: func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return nil, errors.New("error") - }, - callGetSegmentIndexState: func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { - return nil, errors.New("error") - }, - } - - segmentsID := []typeutil.UniqueID{1, 2, 3} - - // check index of 3 segments - // callDescribeIndex() failed - done, err := mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } - - // callDescribeIndex() unexpected error - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } - - // callDescribeIndex() index not exist - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.True(t, done) - assert.Nil(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexInfos: []*indexpb.IndexInfo{ - { - State: commonpb.IndexState_Finished, - }, - }, - }, nil - } - - // callGetSegmentIndexState() failed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { - return []*indexpb.SegmentIndexState{ - { - State: commonpb.IndexState_Finished, - }, - }, nil - } - - // only 1 segment indexed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Nil(t, err) - - mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { - return []*indexpb.SegmentIndexState{ - { - State: commonpb.IndexState_Finished, - }, - { - State: commonpb.IndexState_Finished, - }, - { - State: commonpb.IndexState_Finished, - }, - }, nil - } - - // all segments indexed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.True(t, done) - assert.Nil(t, err) -} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6e1de9133d..bd111f41af 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -415,9 +415,8 @@ func (c *Core) initImportManager() error { f.NewIDAllocator(), f.NewImportFunc(), f.NewMarkSegmentsDroppedFunc(), + f.NewGetSegmentStatesFunc(), f.NewGetCollectionNameFunc(), - f.NewDescribeIndexFunc(), - f.NewGetSegmentIndexStateFunc(), f.NewUnsetIsImportingStateFunc(), ) c.importManager.init(c.ctx) @@ -1701,28 +1700,6 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( if code, ok := c.checkHealthy(); !ok { return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil } - // If setting ImportState_ImportCompleted, simply update the state and return directly. - if ir.GetState() == commonpb.ImportState_ImportCompleted { - if err := c.importManager.setImportTaskState(ir.GetTaskId(), commonpb.ImportState_ImportCompleted); err != nil { - errMsg := "failed to set import task as ImportState_ImportCompleted" - log.Error(errMsg, zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("%s %s", errMsg, err.Error()), - }, nil - } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil - } - // Upon receiving ReportImport request, update the related task's state in task store. - ti, err := c.importManager.updateTaskInfo(ir) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, - Reason: err.Error(), - }, nil - } // This method update a busy node to idle node, and send import task to idle node resendTaskFunc := func() { @@ -1741,6 +1718,19 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( } } + // If setting ImportState_ImportCompleted, simply update the state and return directly. + if ir.GetState() == commonpb.ImportState_ImportCompleted { + log.Warn("this should not be called!") + } + // Upon receiving ReportImport request, update the related task's state in task store. + ti, err := c.importManager.updateTaskInfo(ir) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, + Reason: err.Error(), + }, nil + } + // If task failed, send task to idle datanode if ir.GetState() == commonpb.ImportState_ImportFailed { // When a DataNode failed importing, remove this DataNode from the busy node list and send out import tasks again. @@ -1754,9 +1744,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( resendTaskFunc() } else { // Here ir.GetState() == commonpb.ImportState_ImportPersisted - // When a DataNode finishes importing, remove this DataNode from the busy node list and send out import tasks again. - resendTaskFunc() - // Flush all import data segments. + // Seal these import segments, so they can be auto-flushed later. if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 6c86fe5d23..3c414852f9 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -18,7 +18,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" @@ -903,7 +902,7 @@ func TestCore_GetImportState(t *testing.T) { t.Run("normal case", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil) resp, err := c.GetImportState(ctx, &milvuspb.GetImportStateRequest{ Task: 100, }) @@ -987,7 +986,7 @@ func TestCore_ListImportTasks(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode(), withMeta(meta)) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil) // list all tasks resp, err := c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{}) @@ -1127,10 +1126,10 @@ func TestCore_ReportImport(t *testing.T) { }, nil } - callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, + ErrorCode: commonpb.ErrorCode_Success, }, }, nil } @@ -1149,25 +1148,10 @@ func TestCore_ReportImport(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - t.Run("report complete import", func(t *testing.T) { - ctx := context.Background() - c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) - resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ - TaskId: 100, - State: commonpb.ImportState_ImportCompleted, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - // Change the state back. - err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) - assert.NoError(t, err) - }) - t.Run("report complete import with task not found", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 101, State: commonpb.ImportState_ImportCompleted, @@ -1179,7 +1163,7 @@ func TestCore_ReportImport(t *testing.T) { t.Run("report import started state", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ @@ -1202,8 +1186,7 @@ func TestCore_ReportImport(t *testing.T) { withTtSynchronizer(ticker), withDataCoord(dc)) c.broker = newServerBroker(c) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, - callDescribeIndex, nil, callUnsetIsImportingState) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, callUnsetIsImportingState) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx)