mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Forbid gc index meta when creating index (#21061)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com> Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
abc2adb1f8
commit
0faf24cc54
@ -278,6 +278,9 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
|
func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
|
||||||
|
// Make sure index is not being written.
|
||||||
|
fsw.ic.indexGCLock.Lock()
|
||||||
|
defer fsw.ic.indexGCLock.Unlock()
|
||||||
fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "")
|
fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "")
|
||||||
if len(fieldIndexes) == 0 {
|
if len(fieldIndexes) == 0 {
|
||||||
log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID),
|
log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID),
|
||||||
|
|||||||
@ -125,7 +125,10 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gc *garbageCollector) recycleSegIndexesMeta() {
|
func (gc *garbageCollector) recycleSegIndexesMeta() {
|
||||||
|
gc.indexCoordClient.indexGCLock.Lock()
|
||||||
segIndexes := gc.metaTable.GetAllSegIndexes()
|
segIndexes := gc.metaTable.GetAllSegIndexes()
|
||||||
|
gc.indexCoordClient.indexGCLock.Unlock()
|
||||||
|
|
||||||
collID2segID := make(map[int64]map[int64]struct{})
|
collID2segID := make(map[int64]map[int64]struct{})
|
||||||
for segID, segIdx := range segIndexes {
|
for segID, segIdx := range segIndexes {
|
||||||
if _, ok := collID2segID[segIdx.CollectionID]; !ok {
|
if _, ok := collID2segID[segIdx.CollectionID]; !ok {
|
||||||
|
|||||||
@ -99,7 +99,7 @@ type IndexCoord struct {
|
|||||||
|
|
||||||
metricsCacheManager *metricsinfo.MetricsCacheManager
|
metricsCacheManager *metricsinfo.MetricsCacheManager
|
||||||
|
|
||||||
nodeLock sync.RWMutex
|
indexGCLock sync.RWMutex
|
||||||
|
|
||||||
initOnce sync.Once
|
initOnce sync.Once
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
@ -130,6 +130,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord
|
|||||||
reqTimeoutInterval: time.Second * 10,
|
reqTimeoutInterval: time.Second * 10,
|
||||||
factory: factory,
|
factory: factory,
|
||||||
enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby.GetAsBool(),
|
enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby.GetAsBool(),
|
||||||
|
indexGCLock: sync.RWMutex{},
|
||||||
}
|
}
|
||||||
i.UpdateStateCode(commonpb.StateCode_Abnormal)
|
i.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||||
return i, nil
|
return i, nil
|
||||||
|
|||||||
@ -135,6 +135,41 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cit *CreateIndexTask) createIndexAtomic(index *model.Index, segmentsInfo []*datapb.SegmentInfo) ([]UniqueID, []*datapb.SegmentInfo, error) {
|
||||||
|
buildIDs := make([]UniqueID, 0)
|
||||||
|
segments := make([]*datapb.SegmentInfo, 0)
|
||||||
|
for _, segmentInfo := range segmentsInfo {
|
||||||
|
segIdx := &model.SegmentIndex{
|
||||||
|
SegmentID: segmentInfo.ID,
|
||||||
|
CollectionID: segmentInfo.CollectionID,
|
||||||
|
PartitionID: segmentInfo.PartitionID,
|
||||||
|
NumRows: segmentInfo.NumOfRows,
|
||||||
|
IndexID: cit.indexID,
|
||||||
|
CreateTime: cit.req.GetTimestamp(),
|
||||||
|
}
|
||||||
|
have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID),
|
||||||
|
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName),
|
||||||
|
zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err))
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if have || buildID == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
segments = append(segments, segmentInfo)
|
||||||
|
buildIDs = append(buildIDs, buildID)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := cit.table.CreateIndex(index)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID),
|
||||||
|
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err))
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return buildIDs, segments, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Execute adds the index task to meta table.
|
// Execute adds the index task to meta table.
|
||||||
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
||||||
log.Info("IndexCoord CreateIndexTask Execute", zap.Int64("collectionID", cit.req.CollectionID),
|
log.Info("IndexCoord CreateIndexTask Execute", zap.Int64("collectionID", cit.req.CollectionID),
|
||||||
@ -155,6 +190,11 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
|||||||
UserIndexParams: cit.req.GetUserIndexParams(),
|
UserIndexParams: cit.req.GetUserIndexParams(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lock before GetFlushedSegments,
|
||||||
|
// prevent the flush watcher watches the new flushed segment just after getting the flushed segments, and it locks firstly.
|
||||||
|
cit.indexCoordClient.indexGCLock.RLock()
|
||||||
|
defer cit.indexCoordClient.indexGCLock.RUnlock()
|
||||||
|
|
||||||
// Get flushed segments
|
// Get flushed segments
|
||||||
flushedSegments, err := cit.dataCoordClient.GetFlushedSegments(cit.ctx, &datapb.GetFlushedSegmentsRequest{
|
flushedSegments, err := cit.dataCoordClient.GetFlushedSegments(cit.ctx, &datapb.GetFlushedSegmentsRequest{
|
||||||
Base: commonpbutil.NewMsgBase(
|
Base: commonpbutil.NewMsgBase(
|
||||||
@ -185,36 +225,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
buildIDs := make([]UniqueID, 0)
|
buildIDs, segments, err := cit.createIndexAtomic(index, segmentsInfo.GetInfos())
|
||||||
segments := make([]*datapb.SegmentInfo, 0)
|
|
||||||
for _, segmentInfo := range segmentsInfo.Infos {
|
|
||||||
if segmentInfo.State != commonpb.SegmentState_Flushed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
segIdx := &model.SegmentIndex{
|
|
||||||
SegmentID: segmentInfo.ID,
|
|
||||||
CollectionID: segmentInfo.CollectionID,
|
|
||||||
PartitionID: segmentInfo.PartitionID,
|
|
||||||
NumRows: segmentInfo.NumOfRows,
|
|
||||||
IndexID: cit.indexID,
|
|
||||||
CreateTime: cit.req.GetTimestamp(),
|
|
||||||
}
|
|
||||||
have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("IndexCoord create index on segment fail", zap.Int64("collectionID", cit.req.CollectionID),
|
|
||||||
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName),
|
|
||||||
zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if have || buildID == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
segments = append(segments, segmentInfo)
|
|
||||||
buildIDs = append(buildIDs, buildID)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = cit.table.CreateIndex(index)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID),
|
log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID),
|
||||||
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err))
|
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err))
|
||||||
|
|||||||
144
internal/indexcoord/task_test.go
Normal file
144
internal/indexcoord/task_test.go
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package indexcoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_createIndexAtomic(t *testing.T) {
|
||||||
|
meta := &metaTable{
|
||||||
|
catalog: &indexcoord.Catalog{Txn: &mockETCDKV{
|
||||||
|
save: func(s string, s2 string) error {
|
||||||
|
return errors.New("error")
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
indexLock: sync.RWMutex{},
|
||||||
|
segmentIndexLock: sync.RWMutex{},
|
||||||
|
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{},
|
||||||
|
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
|
||||||
|
segID: {
|
||||||
|
indexID: &model.SegmentIndex{
|
||||||
|
SegmentID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
NumRows: 1025,
|
||||||
|
IndexID: indexID,
|
||||||
|
BuildID: buildID,
|
||||||
|
NodeID: nodeID,
|
||||||
|
IndexVersion: 0,
|
||||||
|
IndexState: commonpb.IndexState_Unissued,
|
||||||
|
FailReason: "",
|
||||||
|
IsDeleted: false,
|
||||||
|
CreateTime: 0,
|
||||||
|
IndexFileKeys: nil,
|
||||||
|
IndexSize: 0,
|
||||||
|
WriteHandoff: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
|
||||||
|
buildID: {
|
||||||
|
SegmentID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
NumRows: 1025,
|
||||||
|
IndexID: indexID,
|
||||||
|
BuildID: buildID,
|
||||||
|
NodeID: nodeID,
|
||||||
|
IndexVersion: 0,
|
||||||
|
IndexState: commonpb.IndexState_Unissued,
|
||||||
|
FailReason: "",
|
||||||
|
IsDeleted: false,
|
||||||
|
CreateTime: 0,
|
||||||
|
IndexFileKeys: nil,
|
||||||
|
IndexSize: 0,
|
||||||
|
WriteHandoff: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ic := &IndexCoord{
|
||||||
|
metaTable: meta,
|
||||||
|
}
|
||||||
|
cit := &CreateIndexTask{
|
||||||
|
BaseTask: BaseTask{
|
||||||
|
table: meta,
|
||||||
|
},
|
||||||
|
indexCoordClient: ic,
|
||||||
|
indexID: indexID,
|
||||||
|
req: &indexpb.CreateIndexRequest{
|
||||||
|
CollectionID: collID,
|
||||||
|
FieldID: fieldID,
|
||||||
|
IndexName: indexName,
|
||||||
|
Timestamp: createTs,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
index := &model.Index{
|
||||||
|
TenantID: "",
|
||||||
|
CollectionID: collID,
|
||||||
|
FieldID: fieldID,
|
||||||
|
IndexID: indexID,
|
||||||
|
IndexName: indexName,
|
||||||
|
IsDeleted: false,
|
||||||
|
CreateTime: createTs,
|
||||||
|
TypeParams: nil,
|
||||||
|
IndexParams: nil,
|
||||||
|
IsAutoIndex: false,
|
||||||
|
UserIndexParams: nil,
|
||||||
|
}
|
||||||
|
segmentsInfo := []*datapb.SegmentInfo{
|
||||||
|
{
|
||||||
|
ID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
InsertChannel: "",
|
||||||
|
NumOfRows: 1025,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
MaxRowNum: 65535,
|
||||||
|
LastExpireTime: 0,
|
||||||
|
StartPosition: nil,
|
||||||
|
DmlPosition: nil,
|
||||||
|
Binlogs: nil,
|
||||||
|
Statslogs: nil,
|
||||||
|
Deltalogs: nil,
|
||||||
|
CreatedByCompaction: false,
|
||||||
|
CompactionFrom: nil,
|
||||||
|
DroppedAt: 0,
|
||||||
|
IsImporting: false,
|
||||||
|
IsFake: false,
|
||||||
|
XXX_NoUnkeyedLiteral: struct{}{},
|
||||||
|
XXX_unrecognized: nil,
|
||||||
|
XXX_sizecache: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
buildIDs, segs, err := cit.createIndexAtomic(index, segmentsInfo)
|
||||||
|
// index already exist
|
||||||
|
assert.Equal(t, 0, len(buildIDs))
|
||||||
|
assert.Equal(t, 0, len(segs))
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user