diff --git a/internal/indexcoord/flush_segment_watcher.go b/internal/indexcoord/flush_segment_watcher.go index ade5bcddcb..408a136f72 100644 --- a/internal/indexcoord/flush_segment_watcher.go +++ b/internal/indexcoord/flush_segment_watcher.go @@ -278,6 +278,9 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { + // Make sure index is not being written. + fsw.ic.indexGCLock.Lock() + defer fsw.ic.indexGCLock.Unlock() fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "") if len(fieldIndexes) == 0 { log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index 5beefd6f84..c827a89aee 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -125,7 +125,10 @@ func (gc *garbageCollector) recycleUnusedIndexes() { } func (gc *garbageCollector) recycleSegIndexesMeta() { + gc.indexCoordClient.indexGCLock.Lock() segIndexes := gc.metaTable.GetAllSegIndexes() + gc.indexCoordClient.indexGCLock.Unlock() + collID2segID := make(map[int64]map[int64]struct{}) for segID, segIdx := range segIndexes { if _, ok := collID2segID[segIdx.CollectionID]; !ok { diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 9f53b6aa6f..3252c2eb50 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -99,7 +99,7 @@ type IndexCoord struct { metricsCacheManager *metricsinfo.MetricsCacheManager - nodeLock sync.RWMutex + indexGCLock sync.RWMutex initOnce sync.Once startOnce sync.Once @@ -130,6 +130,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord reqTimeoutInterval: time.Second * 10, factory: factory, enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby.GetAsBool(), + indexGCLock: sync.RWMutex{}, } i.UpdateStateCode(commonpb.StateCode_Abnormal) return i, nil diff --git a/internal/indexcoord/task.go b/internal/indexcoord/task.go index 8434843708..3679223e48 100644 --- a/internal/indexcoord/task.go +++ b/internal/indexcoord/task.go @@ -135,6 +135,41 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error { return nil } +func (cit *CreateIndexTask) createIndexAtomic(index *model.Index, segmentsInfo []*datapb.SegmentInfo) ([]UniqueID, []*datapb.SegmentInfo, error) { + buildIDs := make([]UniqueID, 0) + segments := make([]*datapb.SegmentInfo, 0) + for _, segmentInfo := range segmentsInfo { + segIdx := &model.SegmentIndex{ + SegmentID: segmentInfo.ID, + CollectionID: segmentInfo.CollectionID, + PartitionID: segmentInfo.PartitionID, + NumRows: segmentInfo.NumOfRows, + IndexID: cit.indexID, + CreateTime: cit.req.GetTimestamp(), + } + have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx) + if err != nil { + log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID), + zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), + zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err)) + return nil, nil, err + } + if have || buildID == 0 { + continue + } + segments = append(segments, segmentInfo) + buildIDs = append(buildIDs, buildID) + } + + err := cit.table.CreateIndex(index) + if err != nil { + log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID), + zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err)) + return nil, nil, err + } + return buildIDs, segments, nil +} + // Execute adds the index task to meta table. func (cit *CreateIndexTask) Execute(ctx context.Context) error { log.Info("IndexCoord CreateIndexTask Execute", zap.Int64("collectionID", cit.req.CollectionID), @@ -155,6 +190,11 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { UserIndexParams: cit.req.GetUserIndexParams(), } + // lock before GetFlushedSegments, + // prevent the flush watcher watches the new flushed segment just after getting the flushed segments, and it locks firstly. + cit.indexCoordClient.indexGCLock.RLock() + defer cit.indexCoordClient.indexGCLock.RUnlock() + // Get flushed segments flushedSegments, err := cit.dataCoordClient.GetFlushedSegments(cit.ctx, &datapb.GetFlushedSegmentsRequest{ Base: commonpbutil.NewMsgBase( @@ -185,36 +225,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { return err } - buildIDs := make([]UniqueID, 0) - segments := make([]*datapb.SegmentInfo, 0) - for _, segmentInfo := range segmentsInfo.Infos { - if segmentInfo.State != commonpb.SegmentState_Flushed { - continue - } - - segIdx := &model.SegmentIndex{ - SegmentID: segmentInfo.ID, - CollectionID: segmentInfo.CollectionID, - PartitionID: segmentInfo.PartitionID, - NumRows: segmentInfo.NumOfRows, - IndexID: cit.indexID, - CreateTime: cit.req.GetTimestamp(), - } - have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx) - if err != nil { - log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID), - zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), - zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err)) - return err - } - if have || buildID == 0 { - continue - } - segments = append(segments, segmentInfo) - buildIDs = append(buildIDs, buildID) - } - - err = cit.table.CreateIndex(index) + buildIDs, segments, err := cit.createIndexAtomic(index, segmentsInfo.GetInfos()) if err != nil { log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID), zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err)) diff --git a/internal/indexcoord/task_test.go b/internal/indexcoord/task_test.go new file mode 100644 index 0000000000..217c9d36a6 --- /dev/null +++ b/internal/indexcoord/task_test.go @@ -0,0 +1,144 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "errors" + "sync" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/stretchr/testify/assert" +) + +func Test_createIndexAtomic(t *testing.T) { + meta := &metaTable{ + catalog: &indexcoord.Catalog{Txn: &mockETCDKV{ + save: func(s string, s2 string) error { + return errors.New("error") + }, + }}, + indexLock: sync.RWMutex{}, + segmentIndexLock: sync.RWMutex{}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{}, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: &model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + } + ic := &IndexCoord{ + metaTable: meta, + } + cit := &CreateIndexTask{ + BaseTask: BaseTask{ + table: meta, + }, + indexCoordClient: ic, + indexID: indexID, + req: &indexpb.CreateIndexRequest{ + CollectionID: collID, + FieldID: fieldID, + IndexName: indexName, + Timestamp: createTs, + }, + } + + index := &model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: createTs, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + } + segmentsInfo := []*datapb.SegmentInfo{ + { + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1025, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65535, + LastExpireTime: 0, + StartPosition: nil, + DmlPosition: nil, + Binlogs: nil, + Statslogs: nil, + Deltalogs: nil, + CreatedByCompaction: false, + CompactionFrom: nil, + DroppedAt: 0, + IsImporting: false, + IsFake: false, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + } + + buildIDs, segs, err := cit.createIndexAtomic(index, segmentsInfo) + // index already exist + assert.Equal(t, 0, len(buildIDs)) + assert.Equal(t, 0, len(segs)) + assert.Error(t, err) +}