feat: Support multiple vector indexes in a collection (#27700)

issue: #25639 

/kind improvement
Signed-off-by: xige-16 <xi.ge@zilliz.com>

---------

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2023-12-29 11:44:45 +08:00 committed by GitHub
parent 55af8f611f
commit 02673914a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 144 additions and 88 deletions

View File

@ -2220,7 +2220,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
defer s.SetupTest() defer s.SetupTest()
tr := s.tr tr := s.tr
s.compactionHandler.EXPECT().isFull().Return(false) s.compactionHandler.EXPECT().isFull().Return(false)
// s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil)
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked"))
tr.handleSignal(&compactionSignal{ tr.handleSignal(&compactionSignal{
segmentID: 1, segmentID: 1,
@ -2237,7 +2237,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
defer s.SetupTest() defer s.SetupTest()
tr := s.tr tr := s.tr
s.compactionHandler.EXPECT().isFull().Return(false) s.compactionHandler.EXPECT().isFull().Return(false)
// s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil)
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{
Properties: map[string]string{ Properties: map[string]string{
common.CollectionAutoCompactionKey: "bad_value", common.CollectionAutoCompactionKey: "bad_value",

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -271,77 +272,73 @@ func (m *meta) GetIndexIDByName(collID int64, indexName string) map[int64]uint64
return indexID2CreateTs return indexID2CreateTs
} }
type IndexState struct { func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID) *indexpb.SegmentIndexState {
state commonpb.IndexState
failReason string
}
func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID) IndexState {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
state := IndexState{ state := &indexpb.SegmentIndexState{
state: commonpb.IndexState_IndexStateNone, SegmentID: segmentID,
failReason: "", State: commonpb.IndexState_IndexStateNone,
FailReason: "",
} }
fieldIndexes, ok := m.indexes[collID] fieldIndexes, ok := m.indexes[collID]
if !ok { if !ok {
state.failReason = fmt.Sprintf("collection not exist with ID: %d", collID) state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state return state
} }
segment := m.segments.GetSegment(segmentID) segment := m.segments.GetSegment(segmentID)
if segment != nil { if segment == nil {
for indexID, index := range fieldIndexes { state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID)
if !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
if segIdx.IndexState != commonpb.IndexState_Finished {
state.state = segIdx.IndexState
state.failReason = segIdx.FailReason
break
}
state.state = commonpb.IndexState_Finished
continue
}
state.state = commonpb.IndexState_Unissued
break
}
}
return state return state
} }
state.failReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID)
if index, ok := fieldIndexes[indexID]; ok && !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
return state
}
state.State = commonpb.IndexState_Unissued
return state
}
state.FailReason = fmt.Sprintf("there is no index on indexID: %d", indexID)
return state return state
} }
func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) IndexState { func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
state := IndexState{ state := &indexpb.SegmentIndexState{
state: commonpb.IndexState_IndexStateNone, SegmentID: segmentID,
failReason: "", State: commonpb.IndexState_IndexStateNone,
FailReason: "",
} }
fieldIndexes, ok := m.indexes[collID] fieldIndexes, ok := m.indexes[collID]
if !ok { if !ok {
state.failReason = fmt.Sprintf("collection not exist with ID: %d", collID) state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state return state
} }
segment := m.segments.GetSegment(segmentID) segment := m.segments.GetSegment(segmentID)
if segment != nil { if segment == nil {
for indexID, index := range fieldIndexes { state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID)
if index.FieldID == fieldID && !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
state.state = segIdx.IndexState
state.failReason = segIdx.FailReason
return state
}
state.state = commonpb.IndexState_Unissued
return state
}
}
state.failReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID)
return state return state
} }
state.failReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID) for indexID, index := range fieldIndexes {
if index.FieldID == fieldID && !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
return state
}
state.State = commonpb.IndexState_Unissued
return state
}
}
state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID)
return state return state
} }
@ -716,7 +713,7 @@ func (m *meta) GetHasUnindexTaskSegments() []*SegmentInfo {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
segments := m.segments.GetSegments() segments := m.segments.GetSegments()
var ret []*SegmentInfo unindexedSegments := make(map[int64]*SegmentInfo)
for _, segment := range segments { for _, segment := range segments {
if !isFlush(segment) { if !isFlush(segment) {
continue continue
@ -724,12 +721,13 @@ func (m *meta) GetHasUnindexTaskSegments() []*SegmentInfo {
if fieldIndexes, ok := m.indexes[segment.CollectionID]; ok { if fieldIndexes, ok := m.indexes[segment.CollectionID]; ok {
for _, index := range fieldIndexes { for _, index := range fieldIndexes {
if _, ok := segment.segmentIndexes[index.IndexID]; !index.IsDeleted && !ok { if _, ok := segment.segmentIndexes[index.IndexID]; !index.IsDeleted && !ok {
ret = append(ret, segment) unindexedSegments[segment.GetID()] = segment
} }
} }
} }
} }
return ret
return lo.MapToSlice(unindexedSegments, func(_ int64, segment *SegmentInfo) *SegmentInfo { return segment })
} }
func (m *meta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex { func (m *meta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {

View File

@ -475,9 +475,10 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
}, },
} }
t.Run("segment has no index", func(t *testing.T) { t.Run("collection has no index", func(t *testing.T) {
state := m.GetSegmentIndexState(collID, segID) state := m.GetSegmentIndexState(collID, segID, indexID)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
assert.Contains(t, state.GetFailReason(), "collection not exist with ID")
}) })
t.Run("meta not saved yet", func(t *testing.T) { t.Run("meta not saved yet", func(t *testing.T) {
@ -496,13 +497,14 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
UserIndexParams: indexParams, UserIndexParams: indexParams,
}, },
} }
state := m.GetSegmentIndexState(collID, segID) state := m.GetSegmentIndexState(collID, segID, indexID)
assert.Equal(t, commonpb.IndexState_Unissued, state.state) assert.Equal(t, commonpb.IndexState_Unissued, state.GetState())
}) })
t.Run("segment not exist", func(t *testing.T) { t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexState(collID, segID+1) state := m.GetSegmentIndexState(collID, segID+1, indexID)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
assert.Contains(t, state.FailReason, "segment is not exist with ID")
}) })
t.Run("unissued", func(t *testing.T) { t.Run("unissued", func(t *testing.T) {
@ -523,8 +525,8 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
IndexSize: 0, IndexSize: 0,
}) })
state := m.GetSegmentIndexState(collID, segID) state := m.GetSegmentIndexState(collID, segID, indexID)
assert.Equal(t, commonpb.IndexState_Unissued, state.state) assert.Equal(t, commonpb.IndexState_Unissued, state.GetState())
}) })
t.Run("finish", func(t *testing.T) { t.Run("finish", func(t *testing.T) {
@ -545,8 +547,8 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
IndexSize: 0, IndexSize: 0,
}) })
state := m.GetSegmentIndexState(collID, segID) state := m.GetSegmentIndexState(collID, segID, indexID)
assert.Equal(t, commonpb.IndexState_Finished, state.state) assert.Equal(t, commonpb.IndexState_Finished, state.GetState())
}) })
} }
@ -643,22 +645,22 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
t.Run("success", func(t *testing.T) { t.Run("success", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID) state := m.GetSegmentIndexStateOnField(collID, segID, fieldID)
assert.Equal(t, commonpb.IndexState_Finished, state.state) assert.Equal(t, commonpb.IndexState_Finished, state.GetState())
}) })
t.Run("no index on field", func(t *testing.T) { t.Run("no index on field", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1) state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
}) })
t.Run("no index", func(t *testing.T) { t.Run("no index", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1) state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
}) })
t.Run("segment not exist", func(t *testing.T) { t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID) state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
}) })
} }
@ -1230,6 +1232,19 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
IsAutoIndex: false, IsAutoIndex: false,
UserIndexParams: nil, UserIndexParams: nil,
}, },
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: indexName + "_1",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
}, },
}, },
} }
@ -1239,6 +1254,33 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
assert.Equal(t, 1, len(segments)) assert.Equal(t, 1, len(segments))
assert.Equal(t, segID, segments[0].ID) assert.Equal(t, segID, segments[0].ID)
}) })
t.Run("segment partial field with index", func(t *testing.T) {
m.segments.segments[segID].segmentIndexes = map[UniqueID]*model.SegmentIndex{
indexID: {
CollectionID: collID,
SegmentID: segID,
IndexID: indexID,
IndexState: commonpb.IndexState_Finished,
},
}
segments := m.GetHasUnindexTaskSegments()
assert.Equal(t, 1, len(segments))
assert.Equal(t, segID, segments[0].ID)
})
t.Run("segment all vector field with index", func(t *testing.T) {
m.segments.segments[segID].segmentIndexes[indexID+1] = &model.SegmentIndex{
CollectionID: collID,
SegmentID: segID,
IndexID: indexID + 1,
IndexState: commonpb.IndexState_Finished,
}
segments := m.GetHasUnindexTaskSegments()
assert.Equal(t, 0, len(segments))
})
} }
// see also: https://github.com/milvus-io/milvus/issues/21660 // see also: https://github.com/milvus-io/milvus/issues/21660

View File

@ -337,7 +337,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
) )
log.Info("receive GetSegmentIndexState", log.Info("receive GetSegmentIndexState",
zap.String("IndexName", req.GetIndexName()), zap.String("IndexName", req.GetIndexName()),
zap.Int64s("fieldID", req.GetSegmentIDs()), zap.Int64s("segmentIDs", req.GetSegmentIDs()),
) )
if err := merr.CheckHealthy(s.GetStateCode()); err != nil { if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
@ -360,12 +360,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
}, nil }, nil
} }
for _, segID := range req.GetSegmentIDs() { for _, segID := range req.GetSegmentIDs() {
state := s.meta.GetSegmentIndexState(req.GetCollectionID(), segID) for indexID := range indexID2CreateTs {
ret.States = append(ret.States, &indexpb.SegmentIndexState{ state := s.meta.GetSegmentIndexState(req.GetCollectionID(), segID, indexID)
SegmentID: segID, ret.States = append(ret.States, state)
State: state.state, }
FailReason: state.failReason,
})
} }
log.Info("GetSegmentIndexState successfully", zap.String("indexName", req.GetIndexName())) log.Info("GetSegmentIndexState successfully", zap.String("indexName", req.GetIndexName()))
return ret, nil return ret, nil

View File

@ -755,7 +755,7 @@ func TestServer_GetIndexState(t *testing.T) {
}}, }},
} }
t.Run("index state is node", func(t *testing.T) { t.Run("index state is none", func(t *testing.T) {
resp, err := s.GetIndexState(ctx, req) resp, err := s.GetIndexState(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
@ -766,7 +766,6 @@ func TestServer_GetIndexState(t *testing.T) {
s.meta.indexes[collID][indexID+1] = &model.Index{ s.meta.indexes[collID][indexID+1] = &model.Index{
TenantID: "", TenantID: "",
CollectionID: collID, CollectionID: collID,
FieldID: fieldID,
IndexID: indexID + 1, IndexID: indexID + 1,
IndexName: "default_idx_1", IndexName: "default_idx_1",
IsDeleted: false, IsDeleted: false,
@ -1833,7 +1832,7 @@ func TestServer_DropIndex(t *testing.T) {
indexID + 3: { indexID + 3: {
TenantID: "", TenantID: "",
CollectionID: collID, CollectionID: collID,
FieldID: fieldID + 3, FieldID: fieldID,
IndexID: indexID + 3, IndexID: indexID + 3,
IndexName: indexName + "_3", IndexName: indexName + "_3",
IsDeleted: false, IsDeleted: false,
@ -1847,7 +1846,7 @@ func TestServer_DropIndex(t *testing.T) {
indexID + 4: { indexID + 4: {
TenantID: "", TenantID: "",
CollectionID: collID, CollectionID: collID,
FieldID: fieldID + 4, FieldID: fieldID,
IndexID: indexID + 4, IndexID: indexID + 4,
IndexName: indexName + "_4", IndexName: indexName + "_4",
IsDeleted: false, IsDeleted: false,

View File

@ -25,12 +25,12 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// Response response interface for verification // Response response interface for verification
@ -71,9 +71,8 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
segmentMap := make(map[int64]*SegmentInfo) segmentMap := make(map[int64]*SegmentInfo)
collectionSegments := make(map[int64][]int64) collectionSegments := make(map[int64][]int64)
// TODO(yah01): This can't handle the case of multiple vector fields exist,
// modify it if we support multiple vector fields. vecFieldIDs := make(map[int64][]int64)
vecFieldID := make(map[int64]int64)
for _, segment := range segments { for _, segment := range segments {
collectionID := segment.GetCollectionID() collectionID := segment.GetCollectionID()
segmentMap[segment.GetID()] = segment segmentMap[segment.GetID()] = segment
@ -88,11 +87,8 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
continue continue
} }
for _, field := range coll.Schema.GetFields() { for _, field := range coll.Schema.GetFields() {
if field.GetDataType() == schemapb.DataType_BinaryVector || if typeutil.IsVectorType(field.GetDataType()) {
field.GetDataType() == schemapb.DataType_FloatVector || vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID())
field.GetDataType() == schemapb.DataType_Float16Vector {
vecFieldID[collection] = field.GetFieldID()
break
} }
} }
} }
@ -102,8 +98,15 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped { if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue continue
} }
segmentState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), vecFieldID[segment.GetCollectionID()])
if segmentState.state == commonpb.IndexState_Finished { hasUnindexedVecField := false
for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] {
segmentIndexState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
if segmentIndexState.State != commonpb.IndexState_Finished {
hasUnindexedVecField = true
}
}
if !hasUnindexedVecField {
indexedSegments = append(indexedSegments, segment) indexedSegments = append(indexedSegments, segment)
} }
} }

View File

@ -139,6 +139,7 @@ message SegmentIndexState {
int64 segmentID = 1; int64 segmentID = 1;
common.IndexState state = 2; common.IndexState state = 2;
string fail_reason = 3; string fail_reason = 3;
string index_name = 4;
} }
message GetSegmentIndexStateResponse { message GetSegmentIndexStateResponse {

View File

@ -1210,6 +1210,21 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
}) })
wg.Add(1)
t.Run("describe index with indexName", func(t *testing.T) {
defer wg.Done()
resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
indexName = resp.IndexDescriptions[0].IndexName
})
wg.Add(1) wg.Add(1)
t.Run("get index statistics", func(t *testing.T) { t.Run("get index statistics", func(t *testing.T) {
defer wg.Done() defer wg.Done()