diff --git a/go.mod b/go.mod index e63ecf67fd..3f3d696405 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/protobuf v1.5.4 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 931768ae59..4aeb002db4 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -661,13 +661,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { s.Run("collection has index, segment is not indexed", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) - index := &model.Index{ - CollectionID: 1, - IndexID: 3, - } + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 1, + } task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11})) - err := s.meta.indexMeta.CreateIndex(context.TODO(), index) + _, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false) s.NoError(err) s.False(task.Process()) @@ -677,11 +676,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { s.Run("collection has index, segment indexed", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) - index := &model.Index{ + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 1, - IndexID: 3, } - err := s.meta.indexMeta.CreateIndex(context.TODO(), index) + _, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false) s.NoError(err) s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index a08cb2ab06..cb86dd4569 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -16,6 +16,7 @@ import ( mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" @@ -40,11 +41,11 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) { }, }, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 1, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) require.NoError(t, err) segArgs := []struct { @@ -152,12 +153,12 @@ func TestGetQueryVChanPositions(t *testing.T) { }, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) s1 := &datapb.SegmentInfo{ @@ -333,12 +334,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) c := &datapb.SegmentInfo{ ID: 1, @@ -403,12 +403,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) a := &datapb.SegmentInfo{ ID: 99, @@ -489,12 +488,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) c := &datapb.SegmentInfo{ ID: 1, @@ -599,12 +597,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, @@ -980,12 +977,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, @@ -1184,12 +1180,12 @@ func TestGetCurrentSegmentsView(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 456b309dff..7c670401fa 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -356,7 +356,10 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) { m.fieldIndexLock.RLock() defer m.fieldIndexLock.RUnlock() + return m.canCreateIndex(req, isJson) +} +func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) { indexes, ok := m.indexes[req.CollectionID] if !ok { return 0, nil @@ -425,23 +428,51 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID) return false, 0 } -func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error { - log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID), - zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) +func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID typeutil.UniqueID, isJson bool) (UniqueID, error) { m.fieldIndexLock.Lock() defer m.fieldIndexLock.Unlock() + indexID, err := m.canCreateIndex(req, isJson) + if err != nil { + return indexID, err + } + + if indexID == 0 { + indexID = allocatedIndexID + } else { + return indexID, nil + } + + // exclude the mmap.enable param, because it will be conflicted with the index's mmap.enable param + typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey}) + index := &model.Index{ + CollectionID: req.GetCollectionID(), + FieldID: req.GetFieldID(), + IndexID: indexID, + IndexName: req.GetIndexName(), + TypeParams: typeParams, + IndexParams: req.GetIndexParams(), + CreateTime: req.GetTimestamp(), + IsAutoIndex: req.GetIsAutoIndex(), + UserIndexParams: req.GetUserIndexParams(), + } + if err := ValidateIndexParams(index); err != nil { + return indexID, err + } + log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID), + zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) + if err := m.catalog.CreateIndex(ctx, index); err != nil { log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName), zap.Error(err)) - return err + return indexID, err } m.updateCollectionIndex(index) log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) - return nil + return indexID, nil } func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error { diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index aa78aaafee..b90ace1aae 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -270,21 +270,8 @@ func TestMeta_CanCreateIndex(t *testing.T) { tmpIndexID, err := m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, int64(0), tmpIndexID) - index := &model.Index{ - TenantID: "", - CollectionID: collID, - FieldID: fieldID, - IndexID: indexID, - IndexName: indexName, - IsDeleted: false, - CreateTime: 0, - TypeParams: typeParams, - IndexParams: indexParams, - IsAutoIndex: false, - UserIndexParams: userIndexParams, - } - err = m.CreateIndex(context.TODO(), index) + indexID, err = m.CreateIndex(context.TODO(), req, indexID, false) assert.NoError(t, err) tmpIndexID, err = m.CanCreateIndex(req, false) @@ -453,31 +440,34 @@ func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { } func TestMeta_CreateIndex(t *testing.T) { + indexParams := []*commonpb.KeyValuePair{ { Key: common.IndexTypeKey, Value: "FLAT", }, } - index := &model.Index{ - TenantID: "", - CollectionID: 1, - FieldID: 2, - IndexID: 3, - IndexName: "_default_idx", - IsDeleted: false, - CreateTime: 12, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, + + typeParams := []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", }, + } + + req := &indexpb.CreateIndexRequest{ + CollectionID: 1, + FieldID: 2, + IndexName: indexName, + TypeParams: typeParams, IndexParams: indexParams, + Timestamp: 12, IsAutoIndex: false, UserIndexParams: indexParams, } + allocatedID := UniqueID(3) + t.Run("success", func(t *testing.T) { sc := catalogmocks.NewDataCoordCatalog(t) sc.On("CreateIndex", @@ -486,7 +476,7 @@ func TestMeta_CreateIndex(t *testing.T) { ).Return(nil) m := newSegmentIndexMeta(sc) - err := m.CreateIndex(context.TODO(), index) + _, err := m.CreateIndex(context.TODO(), req, allocatedID, false) assert.NoError(t, err) }) @@ -498,7 +488,7 @@ func TestMeta_CreateIndex(t *testing.T) { ).Return(errors.New("fail")) m := newSegmentIndexMeta(ec) - err := m.CreateIndex(context.TODO(), index) + _, err := m.CreateIndex(context.TODO(), req, 4, false) assert.Error(t, err) }) } diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 6d07e249fc..1b001bbd1d 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -319,49 +319,15 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques } } - indexID, err := s.meta.indexMeta.CanCreateIndex(req, isJson) + allocateIndexID, err := s.allocator.AllocID(ctx) if err != nil { - metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - // merge with previous params because create index would not pass mmap params - indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) - if len(indexes) == 1 { - req.UserIndexParams = UpdateParams(indexes[0], indexes[0].UserIndexParams, req.GetUserIndexParams()) - req.IndexParams = UpdateParams(indexes[0], indexes[0].IndexParams, req.GetIndexParams()) - } - - if indexID == 0 { - indexID, err = s.allocator.AllocID(ctx) - if err != nil { - log.Warn("failed to alloc indexID", zap.Error(err)) - metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(err), nil - } - } - // exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param - typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey}) - - index := &model.Index{ - CollectionID: req.GetCollectionID(), - FieldID: req.GetFieldID(), - IndexID: indexID, - IndexName: req.GetIndexName(), - TypeParams: typeParams, - IndexParams: req.GetIndexParams(), - CreateTime: req.GetTimestamp(), - IsAutoIndex: req.GetIsAutoIndex(), - UserIndexParams: req.GetUserIndexParams(), - } - - if err := ValidateIndexParams(index); err != nil { + log.Warn("failed to alloc indexID", zap.Error(err)) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } // Get flushed segments and create index - err = s.meta.indexMeta.CreateIndex(ctx, index) + indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocateIndexID, isJson) if err != nil { log.Error("CreateIndex fail", zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err)) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9ef24f6622..5c578f32d3 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -55,6 +55,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/etcd" @@ -1230,13 +1231,12 @@ func TestGetRecoveryInfo(t *testing.T) { }) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) @@ -1453,13 +1453,12 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ SegmentID: segment.ID, @@ -1618,19 +1617,12 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NoError(t, err) err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 0, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: nil, - IsAutoIndex: false, - UserIndexParams: nil, - }) + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 0, + FieldID: 2, + IndexName: "_default_idx_2", + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ SegmentID: seg4.ID, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 808f588b12..5bd952e0e5 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -2,6 +2,7 @@ package datacoord import ( "context" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "testing" "time" @@ -867,13 +868,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { }) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) @@ -1092,13 +1091,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ SegmentID: segment.ID, @@ -1258,19 +1256,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NoError(t, err) err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 0, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: nil, - IsAutoIndex: false, - UserIndexParams: nil, - }) + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 0, + FieldID: 2, + IndexName: "_default_idx_2", + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false) assert.NoError(t, err) svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ SegmentID: seg4.ID,