Support check index state in root coord, for bulk load feature. (#16198)

issue: #15604

/kind feature

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
Ten Thousand Leaves 2022-03-28 16:41:28 +08:00 committed by GitHub
parent 974371c06c
commit 40c703dacd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 250 additions and 73 deletions

View File

@ -3269,7 +3269,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
gibpt.IndexName = Params.CommonCfg.DefaultIndexName
}
describeIndexReq := milvuspb.DescribeIndexRequest{
describeIndexReq := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
MsgID: gibpt.Base.MsgID,
@ -3281,7 +3281,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
// IndexName: gibpt.IndexName,
}
indexDescriptionResp, err2 := gibpt.rootCoord.DescribeIndex(ctx, &describeIndexReq)
indexDescriptionResp, err2 := gibpt.rootCoord.DescribeIndex(ctx, describeIndexReq)
if err2 != nil {
return err2
}
@ -3473,6 +3473,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
}
gist.collectionID = collectionID
// Get partition result for the given collection.
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
@ -3493,6 +3494,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
gist.IndexName = Params.CommonCfg.DefaultIndexName
}
// Retrieve index status and detailed index information.
describeIndexReq := milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
@ -3510,6 +3512,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
return err2
}
// Check if the target index name exists.
matchIndexID := int64(-1)
foundIndexID := false
for _, desc := range indexDescriptionResp.IndexDescriptions {
@ -3523,6 +3526,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
return fmt.Errorf("no index is created")
}
// Fetch segments for partitions.
var allSegmentIDs []UniqueID
for _, partitionID := range partitions.PartitionIDs {
showSegmentsRequest := &milvuspb.ShowSegmentsRequest{
@ -3549,6 +3553,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
IndexBuildIDs: make([]UniqueID, 0),
}
// Fetch index build IDs from segments.
for _, segmentID := range allSegmentIDs {
describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{

View File

@ -29,8 +29,6 @@ import (
"syscall"
"time"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/common"
@ -55,6 +53,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -62,6 +61,9 @@ import (
"go.uber.org/zap"
)
// UniqueID is an alias of typeutil.UniqueID.
type UniqueID = typeutil.UniqueID
// ------------------ struct -----------------------
// DdOperation used to save ddMsg into etcd
@ -132,9 +134,10 @@ type Core struct {
CallGetNumRowsService func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
CallGetFlushedSegmentsService func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error)
//call index builder's client to build index, return build id
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
//call index builder's client to build index, return build id or get index state.
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
CallGetIndexStatesService func(ctx context.Context, IndexBuildIDs []int64) ([]*indexpb.IndexInfo, error)
NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error)
@ -367,63 +370,64 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
zap.Int64("collection id", collMeta.ID),
zap.Int64("partition id", partID),
zap.Error(err))
} else {
for _, segID := range segIDs {
indexInfos := []*etcdpb.FieldIndexInfo{}
indexMeta, ok := segID2IndexMeta[segID]
if !ok {
indexInfos = append(indexInfos, collMeta.FieldIndexes...)
} else {
for _, idx := range collMeta.FieldIndexes {
if _, ok := indexMeta[idx.IndexID]; !ok {
indexInfos = append(indexInfos, idx)
}
cancel2()
continue
}
for _, segID := range segIDs {
indexInfos := []*etcdpb.FieldIndexInfo{}
indexMeta, ok := segID2IndexMeta[segID]
if !ok {
indexInfos = append(indexInfos, collMeta.FieldIndexes...)
} else {
for _, idx := range collMeta.FieldIndexes {
if _, ok := indexMeta[idx.IndexID]; !ok {
indexInfos = append(indexInfos, idx)
}
}
for _, idxInfo := range indexInfos {
/* #nosec G601 */
field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID)
if err != nil {
log.Debug("GetFieldSchemaByID",
zap.Any("collection_meta", collMeta),
zap.Int64("field id", idxInfo.FiledID))
continue
}
indexMeta, ok := indexID2Meta[idxInfo.IndexID]
if !ok {
log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID))
continue
}
info := etcdpb.SegmentIndexInfo{
CollectionID: collMeta.ID,
PartitionID: partID,
SegmentID: segID,
FieldID: idxInfo.FiledID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
log.Debug("building index by background checker",
}
for _, idxInfo := range indexInfos {
/* #nosec G601 */
field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID)
if err != nil {
log.Debug("GetFieldSchemaByID",
zap.Any("collection_meta", collMeta),
zap.Int64("field id", idxInfo.FiledID))
continue
}
indexMeta, ok := indexID2Meta[idxInfo.IndexID]
if !ok {
log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID))
continue
}
info := etcdpb.SegmentIndexInfo{
CollectionID: collMeta.ID,
PartitionID: partID,
SegmentID: segID,
FieldID: idxInfo.FiledID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
log.Debug("building index by background checker",
zap.Int64("segment_id", segID),
zap.Int64("index_id", indexMeta.IndexID),
zap.Int64("collection_id", collMeta.ID))
info.BuildID, err = c.BuildIndex(ctx2, segID, field, &indexMeta, false)
if err != nil {
log.Debug("build index failed",
zap.Int64("segment_id", segID),
zap.Int64("index_id", indexMeta.IndexID),
zap.Int64("collection_id", collMeta.ID))
info.BuildID, err = c.BuildIndex(ctx2, segID, field, &indexMeta, false)
if err != nil {
log.Debug("build index failed",
zap.Int64("segment_id", segID),
zap.Int64("field_id", field.FieldID),
zap.Int64("index_id", indexMeta.IndexID))
continue
}
if info.BuildID != 0 {
info.EnableIndex = true
}
if err := c.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed",
zap.Int64("collection_id", collMeta.ID),
zap.Int64("index_id", info.IndexID),
zap.Int64("build_id", info.BuildID),
zap.Error(err))
}
zap.Int64("field_id", field.FieldID),
zap.Int64("index_id", indexMeta.IndexID))
continue
}
if info.BuildID != 0 {
info.EnableIndex = true
}
if err := c.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed",
zap.Int64("collection_id", collMeta.ID),
zap.Int64("index_id", info.IndexID),
zap.Int64("build_id", info.BuildID),
zap.Error(err))
}
}
}
@ -815,6 +819,29 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
return nil
}
c.CallGetIndexStatesService = func(ctx context.Context, IndexBuildIDs []int64) (idxInfo []*indexpb.IndexInfo, retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("get index state from index service panic, msg = %v", err)
}
}()
<-initCh
res, err := s.GetIndexStates(ctx, &indexpb.GetIndexStatesRequest{
IndexBuildIDs: IndexBuildIDs,
})
if err != nil {
log.Error("RootCoord failed to get index states from IndexCoord.", zap.Error(err))
return nil, err
}
if res.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("Get index states failed.",
zap.String("error_code", res.GetStatus().GetErrorCode().String()),
zap.String("reason", res.GetStatus().GetReason()))
return nil, fmt.Errorf(res.GetStatus().GetErrorCode().String())
}
log.Debug("Successfully got index states.")
return res.GetStates(), nil
}
return nil
}
@ -2196,3 +2223,89 @@ func (c *Core) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult)
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// CountCompleteIndex checks indexing status of the given segments, and returns the # of segments that has complete index.
func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, collectionID UniqueID,
allSegmentIDs []UniqueID) (int, error) {
// Note: Index name is always Params.CommonCfg.DefaultIndexName in current Milvus design as of today.
indexName := Params.CommonCfg.DefaultIndexName
// Retrieve index status and detailed index information.
describeIndexReq := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
},
CollectionName: collectionName,
IndexName: indexName,
}
indexDescriptionResp, err := c.DescribeIndex(ctx, describeIndexReq)
if err != nil {
return 0, err
}
log.Debug("Got index description", zap.String("index_description", indexDescriptionResp.String()))
// Check if the target index name exists.
matchIndexID := int64(-1)
foundIndexID := false
for _, desc := range indexDescriptionResp.IndexDescriptions {
if desc.IndexName == indexName {
matchIndexID = desc.IndexID
foundIndexID = true
break
}
}
if !foundIndexID {
return 0, fmt.Errorf("no index is created")
}
getIndexStatesRequest := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: make([]UniqueID, 0),
}
// Fetch index build IDs from segments.
for _, segmentID := range allSegmentIDs {
describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
},
CollectionID: collectionID,
SegmentID: segmentID,
}
segmentDesc, err := c.DescribeSegment(ctx, describeSegmentRequest)
if err != nil {
return 0, err
}
if segmentDesc.IndexID == matchIndexID {
if segmentDesc.EnableIndex {
getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID)
}
}
}
log.Debug("Proxy GetIndexState", zap.Int("IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err))
// Return early on empty results.
if len(getIndexStatesRequest.IndexBuildIDs) == 0 {
log.Info("Empty index build IDs returned.", zap.String("collection name", collectionName), zap.Int64("collection ID", collectionID))
return 0, nil
}
states, err := c.CallGetIndexStatesService(ctx, getIndexStatesRequest.IndexBuildIDs)
if err != nil {
log.Error("Failed to get index state in checkSegmentIndexStates.", zap.Error(err))
return 0, err
}
// Count the # of segments with finished index.
ct := 0
for _, s := range states {
if s.State == commonpb.IndexState_Finished {
ct++
}
}
log.Info("Segment indexing state checked.",
zap.Int("# of checked segment", len(states)),
zap.Int("# of segments with complete index", ct),
zap.String("collection name", collectionName),
zap.Int64("collection ID", collectionID),
)
return ct, nil
}

View File

@ -35,29 +35,33 @@ import (
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
const TestDMLChannelNum = 32
const (
TestDMLChannelNum = 32
returnError = "ReturnError"
returnUnsuccessfulStatus = "ReturnUnsuccessfulStatus"
)
type ctxKey struct{}
type proxyMock struct {
types.Proxy
@ -257,6 +261,32 @@ func (idx *indexMock) getFileArray() []string {
return ret
}
func (idx *indexMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
v := ctx.Value(ctxKey{}).(string)
if v == returnError {
return nil, fmt.Errorf("injected error")
} else if v == returnUnsuccessfulStatus {
return &indexpb.GetIndexStatesResponse{
Status: &commonpb.Status{
ErrorCode: 100,
Reason: "not so good",
},
}, nil
}
resp := &indexpb.GetIndexStatesResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "all good",
},
}
for range req.IndexBuildIDs {
resp.States = append(resp.States, &indexpb.IndexInfo{
State: commonpb.IndexState_Finished,
})
}
return resp, nil
}
func clearMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack) {
ch := time.After(timeout)
for {
@ -562,7 +592,7 @@ func TestRootCoordInit(t *testing.T) {
}
func TestRootCoord(t *testing.T) {
func TestRootCoord_Base(t *testing.T) {
const (
dbName = "testDb"
collName = "testColl"
@ -1033,7 +1063,7 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
dm.mu.Lock()
dm.segs = []typeutil.UniqueID{1000}
dm.segs = []typeutil.UniqueID{1000, 1001, 1002}
dm.mu.Unlock()
req := &milvuspb.ShowSegmentsRequest{
@ -1050,7 +1080,9 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, int64(1000), rsp.SegmentIDs[0])
assert.Equal(t, 1, len(rsp.SegmentIDs))
assert.Equal(t, int64(1001), rsp.SegmentIDs[1])
assert.Equal(t, int64(1002), rsp.SegmentIDs[2])
assert.Equal(t, 3, len(rsp.SegmentIDs))
})
wg.Add(1)
@ -1082,8 +1114,11 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
time.Sleep(100 * time.Millisecond)
files := im.getFileArray()
assert.Equal(t, 3, len(files))
assert.ElementsMatch(t, files, []string{"file0-100", "file1-100", "file2-100"})
assert.Equal(t, 3*3, len(files))
assert.ElementsMatch(t, files,
[]string{"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100"})
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(collMeta.FieldIndexes))
@ -1163,6 +1198,30 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, 0, len(rsp.IndexDescriptions))
})
wg.Add(1)
t.Run("count complete index", func(t *testing.T) {
defer wg.Done()
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
// Normal case.
count, err := core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, ""),
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.NoError(t, err)
assert.Equal(t, 3, count)
// Case with an empty result.
count, err = core.CountCompleteIndex(ctx, collName, coll.ID, []UniqueID{})
assert.NoError(t, err)
assert.Equal(t, 0, count)
// Case where GetIndexStates failed with error.
_, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnError),
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.Error(t, err)
// Case where GetIndexStates failed with bad status.
_, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnUnsuccessfulStatus),
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.Error(t, err)
})
wg.Add(1)
t.Run("flush segment", func(t *testing.T) {
defer wg.Done()