BulkInsert completes when segments are flushed, without checking indexes (#21604)

/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
Ten Thousand Leaves 2023-01-11 18:57:43 +08:00 committed by GitHub
parent 941891a342
commit 04e595d61b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 223 additions and 381 deletions

View File

@ -434,6 +434,10 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
ret = append(ret, id)
continue
}
if info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
@ -524,7 +528,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
continue
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State == commonpb.SegmentState_Sealed {
if info.State == commonpb.SegmentState_Sealed ||
info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
// change shouldSeal to segment seal policy logic
@ -541,7 +547,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentInfos, ts)
for _, info := range vs {
if info.State == commonpb.SegmentState_Sealed {
if info.State == commonpb.SegmentState_Sealed ||
info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {

View File

@ -44,6 +44,7 @@ type Broker interface {
Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
UnsetIsImportingState(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
MarkSegmentsDropped(context.Context, *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error)
GetSegmentStates(context.Context, *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
@ -203,6 +204,10 @@ func (b *ServerBroker) MarkSegmentsDropped(ctx context.Context, req *datapb.Mark
return b.s.dataCoord.MarkSegmentsDropped(ctx, req)
}
func (b *ServerBroker) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return b.s.dataCoord.GetSegmentStates(ctx, req)
}
func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{
CollectionID: collID,

View File

@ -15,6 +15,7 @@ type GetCollectionNameFunc func(collID, partitionID UniqueID) (string, string, e
type IDAllocator func(count uint32) (UniqueID, UniqueID, error)
type ImportFunc func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
type MarkSegmentsDroppedFunc func(ctx context.Context, segIDs []int64) (*commonpb.Status, error)
type GetSegmentStatesFunc func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
type DescribeIndexFunc func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error)
type GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
type UnsetIsImportingStateFunc func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
@ -24,6 +25,7 @@ type ImportFactory interface {
NewIDAllocator() IDAllocator
NewImportFunc() ImportFunc
NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc
NewGetSegmentStatesFunc() GetSegmentStatesFunc
NewDescribeIndexFunc() DescribeIndexFunc
NewGetSegmentIndexStateFunc() GetSegmentIndexStateFunc
NewUnsetIsImportingStateFunc() UnsetIsImportingStateFunc
@ -49,6 +51,10 @@ func (f ImportFactoryImpl) NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc
return MarkSegmentsDroppedWithCore(f.c)
}
func (f ImportFactoryImpl) NewGetSegmentStatesFunc() GetSegmentStatesFunc {
return GetSegmentStatesWithCore(f.c)
}
func (f ImportFactoryImpl) NewDescribeIndexFunc() DescribeIndexFunc {
return DescribeIndexWithCore(f.c)
}
@ -102,6 +108,12 @@ func MarkSegmentsDroppedWithCore(c *Core) MarkSegmentsDroppedFunc {
}
}
func GetSegmentStatesWithCore(c *Core) GetSegmentStatesFunc {
return func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return c.broker.GetSegmentStates(ctx, req)
}
}
func DescribeIndexWithCore(c *Core) DescribeIndexFunc {
return func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return c.broker.DescribeIndex(ctx, colID)

View File

@ -32,16 +32,15 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
"go.uber.org/zap"
)
const (
MaxPendingCount = 32
MaxPendingCount = 65536 // TODO: Make this configurable.
delimiter = "/"
)
@ -54,10 +53,11 @@ var checkPendingTasksInterval = 60 * 1000
// default 5*60*1000 milliseconds (5 minutes)
var cleanUpLoopInterval = 5 * 60 * 1000
// flipTaskStateInterval is the default interval to loop through tasks and check if their states needs to be
// flipped/updated, for example, from `ImportPersisted` to `ImportCompleted`.
// default 15 * 1000 milliseconds (15 seconds)
var flipTaskStateInterval = 15 * 1000
// flipPersistedTaskInterval is the default interval to loop through tasks and check if their states needs to be
// flipped/updated from `ImportPersisted` to `ImportCompleted`.
// default 2 * 1000 milliseconds (2 seconds)
// TODO: Make this configurable.
var flipPersistedTaskInterval = 2 * 1000
// importManager manager for import tasks
type importManager struct {
@ -79,8 +79,7 @@ type importManager struct {
callImportService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error)
callMarkSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error)
callDescribeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error)
callGetSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
callGetSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
callUnsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
}
@ -89,9 +88,8 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
idAlloc func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error),
importService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error),
markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error),
getSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error),
getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error),
describeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error),
getSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error),
unsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)) *importManager {
mgr := &importManager{
ctx: ctx,
@ -106,9 +104,8 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
idAllocator: idAlloc,
callImportService: importService,
callMarkSegmentsDropped: markSegmentsDropped,
callGetSegmentStates: getSegmentStates,
getCollectionName: getCollectionName,
callDescribeIndex: describeIndex,
callGetSegmentIndexState: getSegmentIndexState,
callUnsetIsImportingState: unsetIsImportingState,
}
return mgr
@ -149,17 +146,17 @@ func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) {
// flipTaskStateLoop periodically calls `flipTaskState` to check if states of the tasks need to be updated.
func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(time.Duration(flipTaskStateInterval) * time.Millisecond)
defer ticker.Stop()
flipPersistedTicker := time.NewTicker(time.Duration(flipPersistedTaskInterval) * time.Millisecond)
defer flipPersistedTicker.Stop()
for {
select {
case <-m.ctx.Done():
log.Info("import manager context done, exit check flipTaskStateLoop")
return
case <-ticker.C:
log.Info("start trying to flip task state")
if err := m.flipTaskState(m.ctx); err != nil {
log.Error("failed to flip task state", zap.Error(err))
case <-flipPersistedTicker.C:
log.Debug("start trying to flip ImportPersisted task")
if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil {
log.Error("failed to flip ImportPersisted task", zap.Error(err))
}
}
}
@ -269,137 +266,105 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
return nil
}
// flipTaskState checks every import task and flips their import state if eligible.
func (m *importManager) flipTaskState(ctx context.Context) error {
// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to
// `ImportCompleted` if eligible.
func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
var importTasks []*datapb.ImportTaskInfo
var err error
if importTasks, err = m.loadFromTaskStore(false); err != nil {
log.Error("failed to load from task store", zap.Error(err))
return err
}
for _, task := range importTasks {
// Checking if ImportPersisted --> ImportCompleted ready.
if task.GetState().GetStateCode() == commonpb.ImportState_ImportPersisted {
log.Info("<ImportPersisted> task found, checking if it is eligible to become <ImportCompleted>",
zap.Int64("task ID", task.GetId()))
// TODO: if collection or partition has been dropped before the task complete,
// we need to set the task to failed, because the checkIndexingDone() cannot know
// whether the collection has been dropped.
importTask := m.getTaskState(task.GetId())
// if this method failed, skip this task, try again in next round
m.flipTaskIndexState(ctx, task.GetId())
if err = m.flipTaskFlushedState(ctx, importTask, task.GetDatanodeId()); err != nil {
log.Error("failed to flip task flushed state",
zap.Int64("task ID", task.GetId()),
zap.Error(err))
}
}
}
return nil
}
func (m *importManager) flipTaskIndexState(ctx context.Context, taskID int64) error {
resp := m.getTaskState(taskID)
ok, err := m.checkIndexingDone(ctx, resp.GetCollectionId(), resp.GetSegmentIds())
func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *milvuspb.GetImportStateResponse, dataNodeID int64) error {
ok, err := m.checkFlushDone(ctx, importTask.GetSegmentIds())
if err != nil {
log.Error("an error occurred while checking index state of segments",
zap.Int64("task ID", taskID),
log.Error("an error occurred while checking flush state of segments",
zap.Int64("task ID", importTask.GetId()),
zap.Error(err))
// Failed to check indexing state of segments
return err
}
if ok {
if err := m.setImportTaskState(resp.GetId(), commonpb.ImportState_ImportCompleted); err != nil {
log.Error("failed to set import task state",
zap.Int64("task ID", resp.GetId()),
zap.Any("target state", commonpb.ImportState_ImportCompleted),
zap.Error(err))
// Failed to update task's state
return err
}
log.Info("indexes are successfully built and the import task has complete!",
zap.Int64("task ID", resp.GetId()))
log.Info("now start unsetting isImporting state of segments",
zap.Int64("task ID", resp.GetId()),
zap.Int64s("segment IDs", resp.GetSegmentIds()))
// Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state.
// All segments are flushed. DataNode becomes available.
func() {
m.busyNodesLock.Lock()
defer m.busyNodesLock.Unlock()
delete(m.busyNodes, dataNodeID)
log.Info("a DataNode is no longer busy after processing task",
zap.Int64("dataNode ID", dataNodeID),
zap.Int64("task ID", importTask.GetId()))
}()
// Unset isImporting flag.
if m.callUnsetIsImportingState == nil {
log.Error("callUnsetIsImportingState function of importManager is nil")
return fmt.Errorf("failed to describe index: segment state method of import manager is nil")
}
status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
SegmentIds: resp.GetSegmentIds(),
_, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
SegmentIds: importTask.GetSegmentIds(),
})
if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportCompleted); err != nil {
log.Error("failed to set import task state",
zap.Int64("task ID", importTask.GetId()),
zap.Any("target state", commonpb.ImportState_ImportCompleted),
zap.Error(err))
return err
}
if err != nil {
log.Error("failed to unset importing state of all segments (could be partial failure)",
zap.Error(err))
return err
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to unset importing state of all segments (could be partial failure)",
zap.Error(errors.New(status.GetReason())))
return errors.New(status.GetReason())
// Start working on new bulk insert tasks.
if err = m.sendOutTasks(m.ctx); err != nil {
log.Error("fail to send out import task to DataNodes",
zap.Int64("task ID", importTask.GetId()))
}
}
return nil
}
// checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`.
// It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise.
func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) {
if m.callDescribeIndex == nil {
log.Error("callDescribeIndex function of importManager is nil")
return false, fmt.Errorf("failed to describe index: describe index method of import manager is nil")
}
// Check if collection has indexed fields.
var descIdxResp *indexpb.DescribeIndexResponse
var err error
if descIdxResp, err = m.callDescribeIndex(ctx, collID); err != nil {
log.Error("failed to describe index",
zap.Int64("collection ID", collID),
zap.Error(err))
// checkFlushDone checks if flush is done on given segments.
func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (bool, error) {
resp, err := m.callGetSegmentStates(ctx, &datapb.GetSegmentStatesRequest{
SegmentIDs: segIDs,
})
if err != nil {
log.Error("failed to get import task segment states",
zap.Int64s("segment IDs", segIDs))
return false, err
}
if descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success &&
descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_IndexNotExist {
log.Error("failed to describe index",
zap.Int64("collection ID", collID),
zap.String("reason", descIdxResp.GetStatus().GetReason()))
return false, errors.New(descIdxResp.GetStatus().GetReason())
getSegmentStates := func(segment *datapb.SegmentStateInfo, _ int) string {
return segment.GetState().String()
}
log.Info("index info retrieved for collection",
zap.Int64("collection ID", collID),
zap.Any("index info", descIdxResp.GetIndexInfos()))
if descIdxResp.GetStatus().GetErrorCode() == commonpb.ErrorCode_IndexNotExist ||
len(descIdxResp.GetIndexInfos()) == 0 {
log.Info("index doesn't exist for collection",
zap.Int64("collection ID", collID))
return true, nil
}
indexedSegmentCount := len(allSegmentIDs)
for _, indexInfo := range descIdxResp.GetIndexInfos() {
states, err := m.callGetSegmentIndexState(ctx, collID, indexInfo.GetIndexName(), allSegmentIDs)
if err != nil {
log.Error("failed to get index state in checkIndexingDone", zap.Error(err))
return false, err
}
// Count the # of segments with finished index.
ct := 0
for _, s := range states {
if s.State == commonpb.IndexState_Finished {
ct++
}
}
if ct < indexedSegmentCount {
indexedSegmentCount = ct
log.Debug("checking import segment states",
zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates)))
for _, states := range resp.GetStates() {
// Flushed segment could get compacted, so only returns false if there are still importing segments.
if states.GetState() == commonpb.SegmentState_Importing ||
states.GetState() == commonpb.SegmentState_Sealed {
return false, nil
}
}
log.Info("segment indexing state checked",
zap.Int64s("segments checked", allSegmentIDs),
zap.Int("# of segments with complete index", indexedSegmentCount),
zap.Int64("collection ID", collID),
)
return len(allSegmentIDs) == indexedSegmentCount, nil
return true, nil
}
func (m *importManager) isRowbased(files []string) (bool, error) {
@ -635,17 +600,6 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
if err != nil {
return nil, err
}
// if is ImportState_ImportPersisted, and index is FLAT, set the task to be complated immediately
// this method is called from importWrapper.reportPersisted() to rootCoord.ReportImport(),
// if flipTaskIndexState failed, the outer caller(importWrapper) will retry 3 times
if ir.GetState() == commonpb.ImportState_ImportPersisted {
err = m.flipTaskIndexState(m.ctx, updatedInfo.GetId())
if err != nil {
return nil, err
}
}
return updatedInfo, nil
}

View File

@ -30,7 +30,6 @@ import (
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -68,13 +67,23 @@ func TestImportManager_NewImportManager(t *testing.T) {
},
CreateTs: time.Now().Unix() - 100,
}
ti3 := &datapb.ImportTaskInfo{
Id: 300,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportCompleted,
},
CreateTs: time.Now().Unix() - 100,
}
taskInfo1, err := proto.Marshal(ti1)
assert.NoError(t, err)
taskInfo2, err := proto.Marshal(ti2)
assert.NoError(t, err)
taskInfo3, err := proto.Marshal(ti3)
assert.NoError(t, err)
mockKv.Save(BuildImportTaskKey(1), "value")
mockKv.Save(BuildImportTaskKey(100), string(taskInfo1))
mockKv.Save(BuildImportTaskKey(200), string(taskInfo2))
mockKv.Save(BuildImportTaskKey(300), string(taskInfo3))
mockCallImportServiceErr := false
callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
@ -96,13 +105,20 @@ func TestImportManager_NewImportManager(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
var wg sync.WaitGroup
wg.Add(1)
t.Run("working task expired", func(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
// there are 2 tasks read from store, one is pending, the other is persisted.
@ -134,7 +150,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
mgr.init(context.TODO())
var wgLoop sync.WaitGroup
@ -153,7 +169,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
assert.Panics(t, func() {
mgr.init(context.TODO())
@ -170,7 +186,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
mgr.init(context.TODO())
})
@ -184,7 +200,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
mgr.init(context.TODO())
func() {
@ -204,7 +220,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
mgr.init(ctx)
var wgLoop sync.WaitGroup
@ -262,7 +278,7 @@ func TestImportManager_TestSetImportTaskState(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil)
assert.NotNil(t, mgr)
_, err := mgr.loadFromTaskStore(true)
assert.NoError(t, err)
@ -355,9 +371,16 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
assert.NotNil(t, mgr)
_, err = mgr.loadFromTaskStore(true)
assert.NoError(t, err)
@ -406,13 +429,23 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
},
CreateTs: time.Now().Unix() - 100,
}
ti3 := &datapb.ImportTaskInfo{
Id: 300,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportCompleted,
Segments: []int64{204, 205, 206},
},
CreateTs: time.Now().Unix() - 100,
}
taskInfo1, err := proto.Marshal(ti1)
assert.NoError(t, err)
taskInfo2, err := proto.Marshal(ti2)
assert.NoError(t, err)
mockKv.Save(BuildImportTaskKey(1), "value")
taskInfo3, err := proto.Marshal(ti3)
assert.NoError(t, err)
mockKv.Save(BuildImportTaskKey(100), string(taskInfo1))
mockKv.Save(BuildImportTaskKey(200), string(taskInfo2))
mockKv.Save(BuildImportTaskKey(300), string(taskInfo3))
mockCallImportServiceErr := false
callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
@ -434,40 +467,21 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexInfos: []*indexpb.IndexInfo{
{},
},
}, nil
}
callGetSegmentIndexState := func(ctx context.Context, collID UniqueID, indexName string,
segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) {
return []*indexpb.SegmentIndexState{
{
SegmentID: 200,
State: commonpb.IndexState_Finished,
},
{
SegmentID: 201,
State: commonpb.IndexState_Finished,
},
{
SegmentID: 202,
State: commonpb.IndexState_Finished,
},
}, nil
}
callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
flipTaskStateInterval = 50
flipPersistedTaskInterval = 20
var wg sync.WaitGroup
wg.Add(1)
t.Run("normal case", func(t *testing.T) {
@ -475,13 +489,13 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped,
nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState)
callGetSegmentStates, nil, callUnsetIsImportingState)
assert.NotNil(t, mgr)
var wgLoop sync.WaitGroup
wgLoop.Add(1)
mgr.flipTaskStateLoop(&wgLoop)
wgLoop.Wait()
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
})
wg.Add(1)
@ -489,15 +503,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped,
nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState)
callGetSegmentStates, nil, callUnsetIsImportingState)
assert.NotNil(t, mgr)
var wgLoop sync.WaitGroup
wgLoop.Add(1)
@ -511,15 +518,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
},
}, nil
}
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped,
nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState)
callGetSegmentStates, nil, callUnsetIsImportingState)
assert.NotNil(t, mgr)
var wgLoop sync.WaitGroup
wgLoop.Add(1)
@ -548,9 +548,15 @@ func TestImportManager_ImportJob(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
// nil request
mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp := mgr.importJob(context.TODO(), nil, colID, 0)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
@ -579,7 +585,7 @@ func TestImportManager_ImportJob(t *testing.T) {
// row-based case, task count equal to file count
// since the importServiceFunc return error, tasks will be kept in pending list
rowReq.Files = []string{"f1.json"}
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks))
assert.Equal(t, 0, len(mgr.workingTasks))
@ -592,7 +598,7 @@ func TestImportManager_ImportJob(t *testing.T) {
// column-based case, one quest one task
// since the importServiceFunc return error, tasks will be kept in pending list
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), colReq, colID, 0)
assert.Equal(t, 1, len(mgr.pendingTasks))
assert.Equal(t, 0, len(mgr.workingTasks))
@ -606,13 +612,13 @@ func TestImportManager_ImportJob(t *testing.T) {
}
// row-based case, since the importServiceFunc return success, tasks will be sent to working list
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.Equal(t, 0, len(mgr.pendingTasks))
assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks))
// column-based case, since the importServiceFunc return success, tasks will be sent to working list
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), colReq, colID, 0)
assert.Equal(t, 0, len(mgr.pendingTasks))
assert.Equal(t, 1, len(mgr.workingTasks))
@ -636,7 +642,7 @@ func TestImportManager_ImportJob(t *testing.T) {
// row-based case, since the importServiceFunc return success for 1 task
// the first task is sent to working list, and 1 task left in pending list
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.Equal(t, 0, len(mgr.pendingTasks))
assert.Equal(t, 1, len(mgr.workingTasks))
@ -710,9 +716,16 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
// each data node owns one task
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
for i := 0; i < len(dnList); i++ {
resp := mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
@ -721,7 +734,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) {
}
// all data nodes are busy, new task waiting in pending list
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp := mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks))
@ -729,7 +742,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) {
// now all data nodes are free again, new task is executed instantly
count = 0
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp = mgr.importJob(context.TODO(), colReq, colID, 0)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 0, len(mgr.pendingTasks))
@ -784,9 +797,16 @@ func TestImportManager_TaskState(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
// add 3 tasks, their ID is 10000, 10001, 10002, make sure updateTaskInfo() works correctly
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
mgr.importJob(context.TODO(), rowReq, colID, 0)
rowReq.Files = []string{"f2.json"}
mgr.importJob(context.TODO(), rowReq, colID, 0)
@ -816,51 +836,6 @@ func TestImportManager_TaskState(t *testing.T) {
},
}
// callDescribeIndex method is nil
_, err = mgr.updateTaskInfo(info)
assert.Error(t, err)
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
// describe index failed, return error
_, err = mgr.updateTaskInfo(info)
assert.Error(t, err)
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
},
}, nil
}
// index doesn't exist, but callUnsetIsImportingState is nil, return error
_, err = mgr.updateTaskInfo(info)
assert.Error(t, err)
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
// index doesn't exist, but failed to unset importing state, return error
_, err = mgr.updateTaskInfo(info)
assert.Error(t, err)
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, errors.New("error to unset importing state")
}
// index doesn't exist, but failed to unset importing state, return error
_, err = mgr.updateTaskInfo(info)
assert.Error(t, err)
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -874,7 +849,7 @@ func TestImportManager_TaskState(t *testing.T) {
assert.Equal(t, int64(100), ti.GetCollectionId())
assert.Equal(t, int64(0), ti.GetPartitionId())
assert.Equal(t, []string{"f2.json"}, ti.GetFiles())
assert.Equal(t, commonpb.ImportState_ImportCompleted, ti.GetState().GetStateCode())
assert.Equal(t, commonpb.ImportState_ImportPersisted, ti.GetState().GetStateCode())
assert.Equal(t, int64(1000), ti.GetState().GetRowCount())
resp := mgr.getTaskState(10000)
@ -882,7 +857,7 @@ func TestImportManager_TaskState(t *testing.T) {
resp = mgr.getTaskState(2)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.ImportState_ImportCompleted, resp.State)
assert.Equal(t, commonpb.ImportState_ImportPersisted, resp.State)
resp = mgr.getTaskState(1)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
@ -938,7 +913,14 @@ func TestImportManager_AllocFail(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil)
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp := mgr.importJob(context.TODO(), rowReq, colID, 0)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 0, len(mgr.pendingTasks))
@ -971,6 +953,13 @@ func TestImportManager_ListAllTasks(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
colID1 := int64(100)
colID2 := int64(101)
@ -999,7 +988,7 @@ func TestImportManager_ListAllTasks(t *testing.T) {
}
mockKv := memkv.NewMemoryKV()
mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, getCollectionName, nil, nil, nil)
mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, callGetSegmentStates, getCollectionName, nil)
// add 10 tasks for collection1, id from 1 to 10
file1 := "f1.json"
@ -1184,100 +1173,3 @@ func TestImportManager_isRowbased(t *testing.T) {
assert.Nil(t, err)
assert.False(t, rb)
}
func TestImportManager_checkIndexingDone(t *testing.T) {
ctx := context.Background()
mgr := &importManager{
callDescribeIndex: func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return nil, errors.New("error")
},
callGetSegmentIndexState: func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) {
return nil, errors.New("error")
},
}
segmentsID := []typeutil.UniqueID{1, 2, 3}
// check index of 3 segments
// callDescribeIndex() failed
done, err := mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.False(t, done)
assert.Error(t, err)
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
// callDescribeIndex() unexpected error
done, err = mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.False(t, done)
assert.Error(t, err)
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
},
}, nil
}
// callDescribeIndex() index not exist
done, err = mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.True(t, done)
assert.Nil(t, err)
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexInfos: []*indexpb.IndexInfo{
{
State: commonpb.IndexState_Finished,
},
},
}, nil
}
// callGetSegmentIndexState() failed
done, err = mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.False(t, done)
assert.Error(t, err)
mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) {
return []*indexpb.SegmentIndexState{
{
State: commonpb.IndexState_Finished,
},
}, nil
}
// only 1 segment indexed
done, err = mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.False(t, done)
assert.Nil(t, err)
mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) {
return []*indexpb.SegmentIndexState{
{
State: commonpb.IndexState_Finished,
},
{
State: commonpb.IndexState_Finished,
},
{
State: commonpb.IndexState_Finished,
},
}, nil
}
// all segments indexed
done, err = mgr.checkIndexingDone(ctx, 1, segmentsID)
assert.True(t, done)
assert.Nil(t, err)
}

View File

@ -415,9 +415,8 @@ func (c *Core) initImportManager() error {
f.NewIDAllocator(),
f.NewImportFunc(),
f.NewMarkSegmentsDroppedFunc(),
f.NewGetSegmentStatesFunc(),
f.NewGetCollectionNameFunc(),
f.NewDescribeIndexFunc(),
f.NewGetSegmentIndexStateFunc(),
f.NewUnsetIsImportingStateFunc(),
)
c.importManager.init(c.ctx)
@ -1701,28 +1700,6 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
}
// If setting ImportState_ImportCompleted, simply update the state and return directly.
if ir.GetState() == commonpb.ImportState_ImportCompleted {
if err := c.importManager.setImportTaskState(ir.GetTaskId(), commonpb.ImportState_ImportCompleted); err != nil {
errMsg := "failed to set import task as ImportState_ImportCompleted"
log.Error(errMsg, zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("%s %s", errMsg, err.Error()),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// Upon receiving ReportImport request, update the related task's state in task store.
ti, err := c.importManager.updateTaskInfo(ir)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
Reason: err.Error(),
}, nil
}
// This method update a busy node to idle node, and send import task to idle node
resendTaskFunc := func() {
@ -1741,6 +1718,19 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
}
}
// If setting ImportState_ImportCompleted, simply update the state and return directly.
if ir.GetState() == commonpb.ImportState_ImportCompleted {
log.Warn("this should not be called!")
}
// Upon receiving ReportImport request, update the related task's state in task store.
ti, err := c.importManager.updateTaskInfo(ir)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
Reason: err.Error(),
}, nil
}
// If task failed, send task to idle datanode
if ir.GetState() == commonpb.ImportState_ImportFailed {
// When a DataNode failed importing, remove this DataNode from the busy node list and send out import tasks again.
@ -1754,9 +1744,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
resendTaskFunc()
} else {
// Here ir.GetState() == commonpb.ImportState_ImportPersisted
// When a DataNode finishes importing, remove this DataNode from the busy node list and send out import tasks again.
resendTaskFunc()
// Flush all import data segments.
// Seal these import segments, so they can be auto-flushed later.
if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil {
log.Error("failed to call Flush on bulk insert segments",
zap.Int64("task ID", ir.GetTaskId()))

View File

@ -18,7 +18,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
@ -903,7 +902,7 @@ func TestCore_GetImportState(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode())
c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil)
c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil)
resp, err := c.GetImportState(ctx, &milvuspb.GetImportStateRequest{
Task: 100,
})
@ -987,7 +986,7 @@ func TestCore_ListImportTasks(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode(), withMeta(meta))
c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil)
c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil)
// list all tasks
resp, err := c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{})
@ -1127,10 +1126,10 @@ func TestCore_ReportImport(t *testing.T) {
}, nil
}
callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
@ -1149,25 +1148,10 @@ func TestCore_ReportImport(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("report complete import", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode())
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
TaskId: 100,
State: commonpb.ImportState_ImportCompleted,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
// Change the state back.
err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending)
assert.NoError(t, err)
})
t.Run("report complete import with task not found", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode())
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
TaskId: 101,
State: commonpb.ImportState_ImportCompleted,
@ -1179,7 +1163,7 @@ func TestCore_ReportImport(t *testing.T) {
t.Run("report import started state", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode())
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
c.importManager.loadFromTaskStore(true)
c.importManager.sendOutTasks(ctx)
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
@ -1202,8 +1186,7 @@ func TestCore_ReportImport(t *testing.T) {
withTtSynchronizer(ticker),
withDataCoord(dc))
c.broker = newServerBroker(c)
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil,
callDescribeIndex, nil, callUnsetIsImportingState)
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, callUnsetIsImportingState)
c.importManager.loadFromTaskStore(true)
c.importManager.sendOutTasks(ctx)