From 5f77dbd00a2c1a45e4ef3da1ab646f39e22be209 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 7 Dec 2022 19:15:20 +0800 Subject: [PATCH] Forbid gc index meta when creating index (#21024) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/indexcoord/garbage_collector.go | 3 + internal/indexcoord/index_coord.go | 3 +- internal/indexcoord/task.go | 69 ++++++----- internal/indexcoord/task_test.go | 144 +++++++++++++++++++++++ 4 files changed, 188 insertions(+), 31 deletions(-) create mode 100644 internal/indexcoord/task_test.go diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index 74bc276f2c..b595e4b9ef 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -125,7 +125,10 @@ func (gc *garbageCollector) recycleUnusedIndexes() { } func (gc *garbageCollector) recycleSegIndexesMeta() { + gc.indexCoordClient.indexGCLock.Lock() segIndexes := gc.metaTable.GetAllSegIndexes() + gc.indexCoordClient.indexGCLock.Unlock() + collID2segID := make(map[int64]map[int64]struct{}) for segID, segIdx := range segIndexes { if _, ok := collID2segID[segIdx.CollectionID]; !ok { diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index d4a7eb0b52..4148b73773 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -99,7 +99,7 @@ type IndexCoord struct { metricsCacheManager *metricsinfo.MetricsCacheManager - nodeLock sync.RWMutex + indexGCLock sync.RWMutex initOnce sync.Once startOnce sync.Once @@ -130,6 +130,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord reqTimeoutInterval: time.Second * 10, factory: factory, enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby, + indexGCLock: sync.RWMutex{}, } i.UpdateStateCode(commonpb.StateCode_Abnormal) return i, nil diff --git a/internal/indexcoord/task.go b/internal/indexcoord/task.go index feaf2f8683..bf561b4ac1 100644 --- a/internal/indexcoord/task.go +++ b/internal/indexcoord/task.go @@ -134,6 +134,44 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error { return nil } +func (cit *CreateIndexTask) createIndexAtomic(index *model.Index, segmentsInfo []*datapb.SegmentInfo) ([]UniqueID, []*datapb.SegmentInfo, error) { + cit.indexCoordClient.indexGCLock.RLock() + defer cit.indexCoordClient.indexGCLock.RUnlock() + + buildIDs := make([]UniqueID, 0) + segments := make([]*datapb.SegmentInfo, 0) + for _, segmentInfo := range segmentsInfo { + segIdx := &model.SegmentIndex{ + SegmentID: segmentInfo.ID, + CollectionID: segmentInfo.CollectionID, + PartitionID: segmentInfo.PartitionID, + NumRows: segmentInfo.NumOfRows, + IndexID: cit.indexID, + CreateTime: cit.req.GetTimestamp(), + } + have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx) + if err != nil { + log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID), + zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), + zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err)) + return nil, nil, err + } + if have || buildID == 0 { + continue + } + segments = append(segments, segmentInfo) + buildIDs = append(buildIDs, buildID) + } + + err := cit.table.CreateIndex(index) + if err != nil { + log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID), + zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err)) + return nil, nil, err + } + return buildIDs, segments, nil +} + // Execute adds the index task to meta table. func (cit *CreateIndexTask) Execute(ctx context.Context) error { log.Info("IndexCoord CreateIndexTask Execute", zap.Int64("collectionID", cit.req.CollectionID), @@ -184,36 +222,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { return err } - buildIDs := make([]UniqueID, 0) - segments := make([]*datapb.SegmentInfo, 0) - for _, segmentInfo := range segmentsInfo.Infos { - if segmentInfo.State != commonpb.SegmentState_Flushed { - continue - } - - segIdx := &model.SegmentIndex{ - SegmentID: segmentInfo.ID, - CollectionID: segmentInfo.CollectionID, - PartitionID: segmentInfo.PartitionID, - NumRows: segmentInfo.NumOfRows, - IndexID: cit.indexID, - CreateTime: cit.req.GetTimestamp(), - } - have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx) - if err != nil { - log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID), - zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), - zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err)) - return err - } - if have || buildID == 0 { - continue - } - segments = append(segments, segmentInfo) - buildIDs = append(buildIDs, buildID) - } - - err = cit.table.CreateIndex(index) + buildIDs, segments, err := cit.createIndexAtomic(index, segmentsInfo.GetInfos()) if err != nil { log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID), zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err)) diff --git a/internal/indexcoord/task_test.go b/internal/indexcoord/task_test.go new file mode 100644 index 0000000000..217c9d36a6 --- /dev/null +++ b/internal/indexcoord/task_test.go @@ -0,0 +1,144 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "errors" + "sync" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/stretchr/testify/assert" +) + +func Test_createIndexAtomic(t *testing.T) { + meta := &metaTable{ + catalog: &indexcoord.Catalog{Txn: &mockETCDKV{ + save: func(s string, s2 string) error { + return errors.New("error") + }, + }}, + indexLock: sync.RWMutex{}, + segmentIndexLock: sync.RWMutex{}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{}, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: &model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + } + ic := &IndexCoord{ + metaTable: meta, + } + cit := &CreateIndexTask{ + BaseTask: BaseTask{ + table: meta, + }, + indexCoordClient: ic, + indexID: indexID, + req: &indexpb.CreateIndexRequest{ + CollectionID: collID, + FieldID: fieldID, + IndexName: indexName, + Timestamp: createTs, + }, + } + + index := &model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: createTs, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + } + segmentsInfo := []*datapb.SegmentInfo{ + { + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1025, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65535, + LastExpireTime: 0, + StartPosition: nil, + DmlPosition: nil, + Binlogs: nil, + Statslogs: nil, + Deltalogs: nil, + CreatedByCompaction: false, + CompactionFrom: nil, + DroppedAt: 0, + IsImporting: false, + IsFake: false, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + } + + buildIDs, segs, err := cit.createIndexAtomic(index, segmentsInfo) + // index already exist + assert.Equal(t, 0, len(buildIDs)) + assert.Equal(t, 0, len(segs)) + assert.Error(t, err) +}