From b8b7d1f47e61dce7c38bb15255be4eed8e5f40cc Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Mon, 20 Mar 2023 10:21:56 +0800 Subject: [PATCH] CreateIndex should wait all segments which has data insert before CreateIndex finish (#22561) Signed-off-by: aoiasd --- internal/datacoord/index_service.go | 8 +++- internal/datacoord/index_service_test.go | 48 ++++++++++++++++++++---- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 8e22349d0c..921258d976 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -329,15 +329,19 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In for _, seg := range segments { totalRows += seg.NumOfRows segIdx, ok := seg.segmentIndexes[index.IndexID] + if !ok { - if seg.LastExpireTime <= index.CreateTime { + if seg.GetStartPosition().GetTimestamp() <= index.CreateTime { cntUnissued++ } continue } - if segIdx.CreateTime > index.CreateTime { + + //data before index create time should create complete + if seg.GetStartPosition().GetTimestamp() > index.CreateTime { continue } + switch segIdx.IndexState { case commonpb.IndexState_IndexStateNone: // can't to here diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 9c9eacfdb9..11f7e9414b 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" @@ -222,6 +223,9 @@ func TestServer_GetIndexState(t *testing.T) { State: commonpb.SegmentState_Flushed, MaxRowNum: 65536, LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, }, segmentIndexes: nil, currRows: 0, @@ -271,6 +275,9 @@ func TestServer_GetIndexState(t *testing.T) { State: commonpb.SegmentState_Flushed, MaxRowNum: 65536, LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, }, segmentIndexes: map[UniqueID]*model.SegmentIndex{ indexID: { @@ -544,6 +551,9 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { State: commonpb.SegmentState_Flushed, MaxRowNum: 65536, LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, segmentIndexes: nil, currRows: 10250, @@ -576,6 +586,9 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { State: commonpb.SegmentState_Flushed, MaxRowNum: 65536, LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, segmentIndexes: map[UniqueID]*model.SegmentIndex{ indexID: { @@ -650,14 +663,15 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { func TestServer_DescribeIndex(t *testing.T) { var ( - collID = UniqueID(1) - partID = UniqueID(2) - fieldID = UniqueID(10) - indexID = UniqueID(100) - segID = UniqueID(1000) - buildID = UniqueID(10000) - indexName = "default_idx" - typeParams = []*commonpb.KeyValuePair{ + collID = UniqueID(1) + partID = UniqueID(2) + fieldID = UniqueID(10) + indexID = UniqueID(100) + segID = UniqueID(1000) + invalidSegID = UniqueID(1001) + buildID = UniqueID(10000) + indexName = "default_idx" + typeParams = []*commonpb.KeyValuePair{ { Key: "dim", Value: "128", @@ -775,6 +789,21 @@ func TestServer_DescribeIndex(t *testing.T) { }, }, segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, segID: { SegmentInfo: &datapb.SegmentInfo{ ID: segID, @@ -784,6 +813,9 @@ func TestServer_DescribeIndex(t *testing.T) { State: commonpb.SegmentState_Flushed, MaxRowNum: 65536, LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, segmentIndexes: map[UniqueID]*model.SegmentIndex{ indexID: {