From a60db370e64c0e42f495d19db9b8a49937abc527 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 25 Oct 2022 16:59:30 +0800 Subject: [PATCH] Compatibility with handling index params (#19997) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- cmd/tools/migration/meta/210_to_220.go | 48 +++++++--- internal/indexcoord/index_coord.go | 8 +- internal/indexcoord/index_coord_test.go | 122 ++++++++++++++++++++++++ internal/indexcoord/meta_table.go | 16 +++- 4 files changed, 172 insertions(+), 22 deletions(-) diff --git a/cmd/tools/migration/meta/210_to_220.go b/cmd/tools/migration/meta/210_to_220.go index f2724c5141..4fc2f3ced7 100644 --- a/cmd/tools/migration/meta/210_to_220.go +++ b/cmd/tools/migration/meta/210_to_220.go @@ -5,15 +5,16 @@ import ( "sort" "strings" + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/cmd/tools/migration/versions" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/querypb" - - "github.com/milvus-io/milvus/cmd/tools/migration/versions" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" ) func alias210ToAlias220(record *pb.CollectionInfo, ts Timestamp) *model.Alias { @@ -153,16 +154,35 @@ func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionI if err != nil { return nil, err } + newIndexParamsMap := make(map[string]string) + for _, kv := range indexInfo.IndexParams { + if kv.Key == common.IndexParamsKey { + params, err := funcutil.ParseIndexParamsMap(kv.Value) + if err != nil { + return nil, err + } + for k, v := range params { + newIndexParamsMap[k] = v + } + } else { + newIndexParamsMap[kv.Key] = kv.Value + } + } + newIndexParams := make([]*commonpb.KeyValuePair, 0) + for k, v := range newIndexParamsMap { + newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{Key: k, Value: v}) + } record := &model.Index{ - TenantID: "", // TODO: how to set this if we support mysql later? - CollectionID: collectionID, - FieldID: index.GetFiledID(), - IndexID: index.GetIndexID(), - IndexName: indexInfo.GetIndexName(), - IsDeleted: indexInfo.GetDeleted(), - CreateTime: indexInfo.GetCreateTime(), - TypeParams: field.GetTypeParams(), - IndexParams: indexInfo.GetIndexParams(), + TenantID: "", // TODO: how to set this if we support mysql later? + CollectionID: collectionID, + FieldID: index.GetFiledID(), + IndexID: index.GetIndexID(), + IndexName: indexInfo.GetIndexName(), + IsDeleted: indexInfo.GetDeleted(), + CreateTime: indexInfo.GetCreateTime(), + TypeParams: field.GetTypeParams(), + IndexParams: newIndexParams, + UserIndexParams: indexInfo.GetIndexParams(), } indexes.AddRecord(collectionID, index.GetIndexID(), record) } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c7dc089854..1c08c2a8e0 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -441,8 +441,10 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe ErrorCode: commonpb.ErrorCode_UnexpectedError, } - if !i.metaTable.CanCreateIndex(req) { - ret.Reason = "CreateIndex failed: index already exist, but parameters are inconsistent" + ok, err := i.metaTable.CanCreateIndex(req) + if !ok { + log.Error("CreateIndex failed", zap.Error(err)) + ret.Reason = err.Error() return ret, nil } @@ -458,7 +460,7 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe req: req, } - err := i.sched.IndexAddQueue.Enqueue(t) + err = i.sched.IndexAddQueue.Enqueue(t) if err != nil { ret.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Reason = err.Error() diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 24446c6150..41e6029ac8 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "path" "strconv" + "sync" "testing" "time" @@ -1075,3 +1076,124 @@ func TestIndexCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) } + +func TestIndexCoord_CreateIndex(t *testing.T) { + ic := &IndexCoord{ + metaTable: &metaTable{ + catalog: nil, + indexLock: sync.RWMutex{}, + segmentIndexLock: sync.RWMutex{}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: 10, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", + Value: "IVF_FLAT", + }, + { + Key: "metrics_type", + Value: "HAMMING", + }, + { + Key: "nlist", + Value: "128", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + } + ic.UpdateStateCode(commonpb.StateCode_Healthy) + + t.Run("index already exist, but params are inconsistent", func(t *testing.T) { + req := &indexpb.CreateIndexRequest{ + CollectionID: collID, + FieldID: fieldID, + IndexName: indexName, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", + Value: "IVF_FLAT", + }, + { + Key: "metrics_type", + Value: "HAMMING", + }, + { + Key: "params", + Value: "{nlist: 128}", + }, + }, + Timestamp: 0, + IsAutoIndex: false, + UserIndexParams: nil, + } + + resp, err := ic.CreateIndex(context.Background(), req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) + }) +} diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index e89264ad51..ae13ab6c5f 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -434,27 +434,33 @@ func (mt *metaTable) GetIndexesForCollection(collID UniqueID, indexName string) return indexInfos } -func (mt *metaTable) CanCreateIndex(req *indexpb.CreateIndexRequest) bool { +func (mt *metaTable) CanCreateIndex(req *indexpb.CreateIndexRequest) (bool, error) { mt.indexLock.RLock() defer mt.indexLock.RUnlock() indexes, ok := mt.collectionIndexes[req.CollectionID] if !ok { - return true + return true, nil } for _, index := range indexes { if index.IsDeleted { continue } if req.IndexName == index.IndexName { - return mt.checkParams(index, req) + if mt.checkParams(index, req) { + return true, nil + } + errMsg := fmt.Sprintf("index already exist, but parameters are inconsistent. source index: %v current index: %v", + fmt.Sprintf("{index_name: %s, field_id: %d, index_params: %v, type_params: %v}", index.IndexName, index.FieldID, index.IndexParams, index.TypeParams), + fmt.Sprintf("{index_name: %s, field_id: %d, index_params: %v, type_params: %v}", req.GetIndexName(), req.GetFieldID(), req.GetIndexParams(), req.GetTypeParams())) + return false, fmt.Errorf("CreateIndex failed: %s", errMsg) } if req.FieldID == index.FieldID { // creating multiple indexes on same field is not supported - return false + return false, fmt.Errorf("CreateIndex failed: creating multiple indexes on same field is not supported") } } - return true + return true, nil } func (mt *metaTable) checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {