enhance: Add param item forcing all indices ready for segment (#44313)

Related to #44312

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-09-12 17:51:58 +08:00 committed by GitHub
parent 1d658b5e84
commit bfc9e80e14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 38 additions and 6 deletions

View File

@ -579,20 +579,33 @@ func (m *indexMeta) GetIndexedSegments(collectionID int64, segmentIDs, fieldIDs
m.fieldIndexLock.RUnlock() m.fieldIndexLock.RUnlock()
fieldIDSet := typeutil.NewUniqueSet(fieldIDs...) fieldIDSet := typeutil.NewUniqueSet(fieldIDs...)
matchedFields := typeutil.NewUniqueSet()
targetIndices := lo.Filter(lo.Values(fieldIndexes), func(index *model.Index, _ int) bool {
return fieldIDSet.Contain(index.FieldID)
})
for _, index := range targetIndices {
matchedFields.Insert(index.FieldID)
}
// some field has no index on it
if len(matchedFields) != len(fieldIDSet) {
return nil
}
checkSegmentState := func(indexes *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]) bool { checkSegmentState := func(indexes *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]) bool {
indexedFields := 0 indexedFields := 0
for indexID, index := range fieldIndexes { for _, index := range targetIndices {
if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted { if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted {
continue continue
} }
if segIdx, ok := indexes.Get(indexID); ok && segIdx.IndexState == commonpb.IndexState_Finished { if segIdx, ok := indexes.Get(index.IndexID); ok && segIdx.IndexState == commonpb.IndexState_Finished {
indexedFields += 1 indexedFields += 1
} }
} }
return indexedFields == fieldIDSet.Len() return indexedFields == len(targetIndices)
} }
ret := make([]int64, 0) ret := make([]int64, 0)

View File

@ -103,10 +103,19 @@ func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, ski
} }
// get vector field id // get vector field id
vecFieldIDs := make([]int64, 0) var targetFieldIds []int64
// wait all vector datatype fields only
for _, field := range coll.Schema.GetFields() { for _, field := range coll.Schema.GetFields() {
if typeutil.IsVectorType(field.GetDataType()) { if typeutil.IsVectorType(field.GetDataType()) {
vecFieldIDs = append(vecFieldIDs, field.GetFieldID()) targetFieldIds = append(targetFieldIds, field.GetFieldID())
}
}
// include all scalar fields with index
if paramtable.Get().DataCoordCfg.DVForceAllIndexReady.GetAsBool() {
indices := mt.indexMeta.GetIndexesForCollection(collection, "")
for _, index := range indices {
targetFieldIds = append(targetFieldIds, index.FieldID)
} }
} }
segmentIDs := lo.Map(segmentList, func(seg *SegmentInfo, _ int) UniqueID { segmentIDs := lo.Map(segmentList, func(seg *SegmentInfo, _ int) UniqueID {
@ -114,7 +123,7 @@ func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, ski
}) })
// get indexed segments which finish build index on all vector field // get indexed segments which finish build index on all vector field
indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, vecFieldIDs) indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, targetFieldIds)
if len(indexed) > 0 { if len(indexed) > 0 {
indexedSet := typeutil.NewUniqueSet(indexed...) indexedSet := typeutil.NewUniqueSet(indexed...)
for _, segment := range segmentList { for _, segment := range segmentList {

View File

@ -4270,6 +4270,7 @@ type dataCoordConfig struct {
SegmentFlushInterval ParamItem `refreshable:"true"` SegmentFlushInterval ParamItem `refreshable:"true"`
BlockingL0EntryNum ParamItem `refreshable:"true"` BlockingL0EntryNum ParamItem `refreshable:"true"`
BlockingL0SizeInMB ParamItem `refreshable:"true"` BlockingL0SizeInMB ParamItem `refreshable:"true"`
DVForceAllIndexReady ParamItem `refreshable:"true"`
// compaction // compaction
EnableCompaction ParamItem `refreshable:"false"` EnableCompaction ParamItem `refreshable:"false"`
@ -4567,6 +4568,15 @@ exceeds this threshold, the earliest growing segments will be sealed.`,
} }
p.BlockingL0SizeInMB.Init(base.mgr) p.BlockingL0SizeInMB.Init(base.mgr)
p.DVForceAllIndexReady = ParamItem{
Key: "dataCoord.dataview.forceAllIndexReady",
Version: "2.6.2",
DefaultValue: "false",
Doc: `If set to true, Milvus will wait all indices ready before the segment appears in indexed dataview.`,
Export: false,
}
p.DVForceAllIndexReady.Init(base.mgr)
p.EnableCompaction = ParamItem{ p.EnableCompaction = ParamItem{
Key: "dataCoord.enableCompaction", Key: "dataCoord.enableCompaction",
Version: "2.0.0", Version: "2.0.0",