CreateIndex should wait all segments which has data insert before CreateIndex finish (#22561)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2023-03-20 10:21:56 +08:00 committed by GitHub
parent 005d178a0e
commit b8b7d1f47e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 10 deletions

View File

@ -329,15 +329,19 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
for _, seg := range segments { for _, seg := range segments {
totalRows += seg.NumOfRows totalRows += seg.NumOfRows
segIdx, ok := seg.segmentIndexes[index.IndexID] segIdx, ok := seg.segmentIndexes[index.IndexID]
if !ok { if !ok {
if seg.LastExpireTime <= index.CreateTime { if seg.GetStartPosition().GetTimestamp() <= index.CreateTime {
cntUnissued++ cntUnissued++
} }
continue continue
} }
if segIdx.CreateTime > index.CreateTime {
//data before index create time should create complete
if seg.GetStartPosition().GetTimestamp() > index.CreateTime {
continue continue
} }
switch segIdx.IndexState { switch segIdx.IndexState {
case commonpb.IndexState_IndexStateNone: case commonpb.IndexState_IndexStateNone:
// can't to here // can't to here

View File

@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
@ -222,6 +223,9 @@ func TestServer_GetIndexState(t *testing.T) {
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536, MaxRowNum: 65536,
LastExpireTime: createTS - 1, LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
}, },
segmentIndexes: nil, segmentIndexes: nil,
currRows: 0, currRows: 0,
@ -271,6 +275,9 @@ func TestServer_GetIndexState(t *testing.T) {
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536, MaxRowNum: 65536,
LastExpireTime: createTS - 1, LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
}, },
segmentIndexes: map[UniqueID]*model.SegmentIndex{ segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: { indexID: {
@ -544,6 +551,9 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536, MaxRowNum: 65536,
LastExpireTime: createTS, LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
}, },
segmentIndexes: nil, segmentIndexes: nil,
currRows: 10250, currRows: 10250,
@ -576,6 +586,9 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536, MaxRowNum: 65536,
LastExpireTime: createTS, LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
}, },
segmentIndexes: map[UniqueID]*model.SegmentIndex{ segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: { indexID: {
@ -655,6 +668,7 @@ func TestServer_DescribeIndex(t *testing.T) {
fieldID = UniqueID(10) fieldID = UniqueID(10)
indexID = UniqueID(100) indexID = UniqueID(100)
segID = UniqueID(1000) segID = UniqueID(1000)
invalidSegID = UniqueID(1001)
buildID = UniqueID(10000) buildID = UniqueID(10000)
indexName = "default_idx" indexName = "default_idx"
typeParams = []*commonpb.KeyValuePair{ typeParams = []*commonpb.KeyValuePair{
@ -775,6 +789,21 @@ func TestServer_DescribeIndex(t *testing.T) {
}, },
}, },
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: { segID: {
SegmentInfo: &datapb.SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{
ID: segID, ID: segID,
@ -784,6 +813,9 @@ func TestServer_DescribeIndex(t *testing.T) {
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536, MaxRowNum: 65536,
LastExpireTime: createTS, LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
}, },
segmentIndexes: map[UniqueID]*model.SegmentIndex{ segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: { indexID: {