diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 0400e2bb0a..77b4e9a73f 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -3269,7 +3269,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error { gibpt.IndexName = Params.CommonCfg.DefaultIndexName } - describeIndexReq := milvuspb.DescribeIndexRequest{ + describeIndexReq := &milvuspb.DescribeIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeIndex, MsgID: gibpt.Base.MsgID, @@ -3281,7 +3281,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error { // IndexName: gibpt.IndexName, } - indexDescriptionResp, err2 := gibpt.rootCoord.DescribeIndex(ctx, &describeIndexReq) + indexDescriptionResp, err2 := gibpt.rootCoord.DescribeIndex(ctx, describeIndexReq) if err2 != nil { return err2 } @@ -3473,6 +3473,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error { } gist.collectionID = collectionID + // Get partition result for the given collection. showPartitionRequest := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, @@ -3493,6 +3494,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error { gist.IndexName = Params.CommonCfg.DefaultIndexName } + // Retrieve index status and detailed index information. describeIndexReq := milvuspb.DescribeIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeIndex, @@ -3510,6 +3512,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error { return err2 } + // Check if the target index name exists. matchIndexID := int64(-1) foundIndexID := false for _, desc := range indexDescriptionResp.IndexDescriptions { @@ -3523,6 +3526,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error { return fmt.Errorf("no index is created") } + // Fetch segments for partitions. var allSegmentIDs []UniqueID for _, partitionID := range partitions.PartitionIDs { showSegmentsRequest := &milvuspb.ShowSegmentsRequest{ @@ -3549,6 +3553,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error { IndexBuildIDs: make([]UniqueID, 0), } + // Fetch index build IDs from segments. for _, segmentID := range allSegmentIDs { describeSegmentRequest := &milvuspb.DescribeSegmentRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 34454ebc49..3d42402b4e 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -29,8 +29,6 @@ import ( "syscall" "time" - "github.com/milvus-io/milvus/internal/util/timerecord" - "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/common" @@ -55,6 +53,7 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -62,6 +61,9 @@ import ( "go.uber.org/zap" ) +// UniqueID is an alias of typeutil.UniqueID. +type UniqueID = typeutil.UniqueID + // ------------------ struct ----------------------- // DdOperation used to save ddMsg into etcd @@ -132,9 +134,10 @@ type Core struct { CallGetNumRowsService func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) CallGetFlushedSegmentsService func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error) - //call index builder's client to build index, return build id - CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) - CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error + //call index builder's client to build index, return build id or get index state. + CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) + CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error + CallGetIndexStatesService func(ctx context.Context, IndexBuildIDs []int64) ([]*indexpb.IndexInfo, error) NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error) @@ -367,63 +370,64 @@ func (c *Core) checkFlushedSegments(ctx context.Context) { zap.Int64("collection id", collMeta.ID), zap.Int64("partition id", partID), zap.Error(err)) - } else { - for _, segID := range segIDs { - indexInfos := []*etcdpb.FieldIndexInfo{} - indexMeta, ok := segID2IndexMeta[segID] - if !ok { - indexInfos = append(indexInfos, collMeta.FieldIndexes...) - } else { - for _, idx := range collMeta.FieldIndexes { - if _, ok := indexMeta[idx.IndexID]; !ok { - indexInfos = append(indexInfos, idx) - } + cancel2() + continue + } + for _, segID := range segIDs { + indexInfos := []*etcdpb.FieldIndexInfo{} + indexMeta, ok := segID2IndexMeta[segID] + if !ok { + indexInfos = append(indexInfos, collMeta.FieldIndexes...) + } else { + for _, idx := range collMeta.FieldIndexes { + if _, ok := indexMeta[idx.IndexID]; !ok { + indexInfos = append(indexInfos, idx) } } - for _, idxInfo := range indexInfos { - /* #nosec G601 */ - field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID) - if err != nil { - log.Debug("GetFieldSchemaByID", - zap.Any("collection_meta", collMeta), - zap.Int64("field id", idxInfo.FiledID)) - continue - } - indexMeta, ok := indexID2Meta[idxInfo.IndexID] - if !ok { - log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID)) - continue - } - info := etcdpb.SegmentIndexInfo{ - CollectionID: collMeta.ID, - PartitionID: partID, - SegmentID: segID, - FieldID: idxInfo.FiledID, - IndexID: idxInfo.IndexID, - EnableIndex: false, - } - log.Debug("building index by background checker", + } + for _, idxInfo := range indexInfos { + /* #nosec G601 */ + field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID) + if err != nil { + log.Debug("GetFieldSchemaByID", + zap.Any("collection_meta", collMeta), + zap.Int64("field id", idxInfo.FiledID)) + continue + } + indexMeta, ok := indexID2Meta[idxInfo.IndexID] + if !ok { + log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID)) + continue + } + info := etcdpb.SegmentIndexInfo{ + CollectionID: collMeta.ID, + PartitionID: partID, + SegmentID: segID, + FieldID: idxInfo.FiledID, + IndexID: idxInfo.IndexID, + EnableIndex: false, + } + log.Debug("building index by background checker", + zap.Int64("segment_id", segID), + zap.Int64("index_id", indexMeta.IndexID), + zap.Int64("collection_id", collMeta.ID)) + info.BuildID, err = c.BuildIndex(ctx2, segID, field, &indexMeta, false) + if err != nil { + log.Debug("build index failed", zap.Int64("segment_id", segID), - zap.Int64("index_id", indexMeta.IndexID), - zap.Int64("collection_id", collMeta.ID)) - info.BuildID, err = c.BuildIndex(ctx2, segID, field, &indexMeta, false) - if err != nil { - log.Debug("build index failed", - zap.Int64("segment_id", segID), - zap.Int64("field_id", field.FieldID), - zap.Int64("index_id", indexMeta.IndexID)) - continue - } - if info.BuildID != 0 { - info.EnableIndex = true - } - if err := c.MetaTable.AddIndex(&info); err != nil { - log.Debug("Add index into meta table failed", - zap.Int64("collection_id", collMeta.ID), - zap.Int64("index_id", info.IndexID), - zap.Int64("build_id", info.BuildID), - zap.Error(err)) - } + zap.Int64("field_id", field.FieldID), + zap.Int64("index_id", indexMeta.IndexID)) + continue + } + if info.BuildID != 0 { + info.EnableIndex = true + } + if err := c.MetaTable.AddIndex(&info); err != nil { + log.Debug("Add index into meta table failed", + zap.Int64("collection_id", collMeta.ID), + zap.Int64("index_id", info.IndexID), + zap.Int64("build_id", info.BuildID), + zap.Error(err)) } } } @@ -815,6 +819,29 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { return nil } + c.CallGetIndexStatesService = func(ctx context.Context, IndexBuildIDs []int64) (idxInfo []*indexpb.IndexInfo, retErr error) { + defer func() { + if err := recover(); err != nil { + retErr = fmt.Errorf("get index state from index service panic, msg = %v", err) + } + }() + <-initCh + res, err := s.GetIndexStates(ctx, &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: IndexBuildIDs, + }) + if err != nil { + log.Error("RootCoord failed to get index states from IndexCoord.", zap.Error(err)) + return nil, err + } + if res.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + log.Error("Get index states failed.", + zap.String("error_code", res.GetStatus().GetErrorCode().String()), + zap.String("reason", res.GetStatus().GetReason())) + return nil, fmt.Errorf(res.GetStatus().GetErrorCode().String()) + } + log.Debug("Successfully got index states.") + return res.GetStates(), nil + } return nil } @@ -2196,3 +2223,89 @@ func (c *Core) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) ErrorCode: commonpb.ErrorCode_Success, }, nil } + +// CountCompleteIndex checks indexing status of the given segments, and returns the # of segments that has complete index. +func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, collectionID UniqueID, + allSegmentIDs []UniqueID) (int, error) { + // Note: Index name is always Params.CommonCfg.DefaultIndexName in current Milvus design as of today. + indexName := Params.CommonCfg.DefaultIndexName + + // Retrieve index status and detailed index information. + describeIndexReq := &milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeIndex, + }, + CollectionName: collectionName, + IndexName: indexName, + } + indexDescriptionResp, err := c.DescribeIndex(ctx, describeIndexReq) + if err != nil { + return 0, err + } + log.Debug("Got index description", zap.String("index_description", indexDescriptionResp.String())) + + // Check if the target index name exists. + matchIndexID := int64(-1) + foundIndexID := false + for _, desc := range indexDescriptionResp.IndexDescriptions { + if desc.IndexName == indexName { + matchIndexID = desc.IndexID + foundIndexID = true + break + } + } + if !foundIndexID { + return 0, fmt.Errorf("no index is created") + } + + getIndexStatesRequest := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: make([]UniqueID, 0), + } + + // Fetch index build IDs from segments. + for _, segmentID := range allSegmentIDs { + describeSegmentRequest := &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeSegment, + }, + CollectionID: collectionID, + SegmentID: segmentID, + } + segmentDesc, err := c.DescribeSegment(ctx, describeSegmentRequest) + if err != nil { + return 0, err + } + if segmentDesc.IndexID == matchIndexID { + if segmentDesc.EnableIndex { + getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID) + } + } + } + log.Debug("Proxy GetIndexState", zap.Int("IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err)) + + // Return early on empty results. + if len(getIndexStatesRequest.IndexBuildIDs) == 0 { + log.Info("Empty index build IDs returned.", zap.String("collection name", collectionName), zap.Int64("collection ID", collectionID)) + return 0, nil + } + states, err := c.CallGetIndexStatesService(ctx, getIndexStatesRequest.IndexBuildIDs) + if err != nil { + log.Error("Failed to get index state in checkSegmentIndexStates.", zap.Error(err)) + return 0, err + } + + // Count the # of segments with finished index. + ct := 0 + for _, s := range states { + if s.State == commonpb.IndexState_Finished { + ct++ + } + } + log.Info("Segment indexing state checked.", + zap.Int("# of checked segment", len(states)), + zap.Int("# of segments with complete index", ct), + zap.String("collection name", collectionName), + zap.Int64("collection ID", collectionID), + ) + return ct, nil +} diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index b7f328029c..5f4b654f88 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -35,29 +35,33 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/milvus-io/milvus/internal/util/retry" - "github.com/stretchr/testify/require" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" ) -const TestDMLChannelNum = 32 +const ( + TestDMLChannelNum = 32 + returnError = "ReturnError" + returnUnsuccessfulStatus = "ReturnUnsuccessfulStatus" +) + +type ctxKey struct{} type proxyMock struct { types.Proxy @@ -257,6 +261,32 @@ func (idx *indexMock) getFileArray() []string { return ret } +func (idx *indexMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { + v := ctx.Value(ctxKey{}).(string) + if v == returnError { + return nil, fmt.Errorf("injected error") + } else if v == returnUnsuccessfulStatus { + return &indexpb.GetIndexStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: 100, + Reason: "not so good", + }, + }, nil + } + resp := &indexpb.GetIndexStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "all good", + }, + } + for range req.IndexBuildIDs { + resp.States = append(resp.States, &indexpb.IndexInfo{ + State: commonpb.IndexState_Finished, + }) + } + return resp, nil +} + func clearMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack) { ch := time.After(timeout) for { @@ -562,7 +592,7 @@ func TestRootCoordInit(t *testing.T) { } -func TestRootCoord(t *testing.T) { +func TestRootCoord_Base(t *testing.T) { const ( dbName = "testDb" collName = "testColl" @@ -1033,7 +1063,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) partID := coll.PartitionIDs[1] dm.mu.Lock() - dm.segs = []typeutil.UniqueID{1000} + dm.segs = []typeutil.UniqueID{1000, 1001, 1002} dm.mu.Unlock() req := &milvuspb.ShowSegmentsRequest{ @@ -1050,7 +1080,9 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) assert.Equal(t, int64(1000), rsp.SegmentIDs[0]) - assert.Equal(t, 1, len(rsp.SegmentIDs)) + assert.Equal(t, int64(1001), rsp.SegmentIDs[1]) + assert.Equal(t, int64(1002), rsp.SegmentIDs[2]) + assert.Equal(t, 3, len(rsp.SegmentIDs)) }) wg.Add(1) @@ -1082,8 +1114,11 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode) time.Sleep(100 * time.Millisecond) files := im.getFileArray() - assert.Equal(t, 3, len(files)) - assert.ElementsMatch(t, files, []string{"file0-100", "file1-100", "file2-100"}) + assert.Equal(t, 3*3, len(files)) + assert.ElementsMatch(t, files, + []string{"file0-100", "file1-100", "file2-100", + "file0-100", "file1-100", "file2-100", + "file0-100", "file1-100", "file2-100"}) collMeta, err = core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 1, len(collMeta.FieldIndexes)) @@ -1163,6 +1198,30 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, 0, len(rsp.IndexDescriptions)) }) + wg.Add(1) + t.Run("count complete index", func(t *testing.T) { + defer wg.Done() + coll, err := core.MetaTable.GetCollectionByName(collName, 0) + assert.NoError(t, err) + // Normal case. + count, err := core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, ""), + collName, coll.ID, []UniqueID{1000, 1001, 1002}) + assert.NoError(t, err) + assert.Equal(t, 3, count) + // Case with an empty result. + count, err = core.CountCompleteIndex(ctx, collName, coll.ID, []UniqueID{}) + assert.NoError(t, err) + assert.Equal(t, 0, count) + // Case where GetIndexStates failed with error. + _, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnError), + collName, coll.ID, []UniqueID{1000, 1001, 1002}) + assert.Error(t, err) + // Case where GetIndexStates failed with bad status. + _, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnUnsuccessfulStatus), + collName, coll.ID, []UniqueID{1000, 1001, 1002}) + assert.Error(t, err) + }) + wg.Add(1) t.Run("flush segment", func(t *testing.T) { defer wg.Done()