From bfc9e80e142b6ebeb679ff3e419d8d3b963b5e00 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 12 Sep 2025 17:51:58 +0800 Subject: [PATCH] enhance: Add param item forcing all indices ready for segment (#44313) Related to #44312 --------- Signed-off-by: Congqi Xia --- internal/datacoord/index_meta.go | 19 ++++++++++++++++--- internal/datacoord/util.go | 15 ++++++++++++--- pkg/util/paramtable/component_param.go | 10 ++++++++++ 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 9c78f361e4..7beeb97a4e 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -579,20 +579,33 @@ func (m *indexMeta) GetIndexedSegments(collectionID int64, segmentIDs, fieldIDs m.fieldIndexLock.RUnlock() fieldIDSet := typeutil.NewUniqueSet(fieldIDs...) + matchedFields := typeutil.NewUniqueSet() + + targetIndices := lo.Filter(lo.Values(fieldIndexes), func(index *model.Index, _ int) bool { + return fieldIDSet.Contain(index.FieldID) + }) + for _, index := range targetIndices { + matchedFields.Insert(index.FieldID) + } + + // some field has no index on it + if len(matchedFields) != len(fieldIDSet) { + return nil + } checkSegmentState := func(indexes *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]) bool { indexedFields := 0 - for indexID, index := range fieldIndexes { + for _, index := range targetIndices { if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted { continue } - if segIdx, ok := indexes.Get(indexID); ok && segIdx.IndexState == commonpb.IndexState_Finished { + if segIdx, ok := indexes.Get(index.IndexID); ok && segIdx.IndexState == commonpb.IndexState_Finished { indexedFields += 1 } } - return indexedFields == fieldIDSet.Len() + return indexedFields == len(targetIndices) } ret := make([]int64, 0) diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 018abbaf0e..e773ea2aea 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -103,10 +103,19 @@ func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, ski } // get vector field id - vecFieldIDs := make([]int64, 0) + var targetFieldIds []int64 + // wait all vector datatype fields only for _, field := range coll.Schema.GetFields() { if typeutil.IsVectorType(field.GetDataType()) { - vecFieldIDs = append(vecFieldIDs, field.GetFieldID()) + targetFieldIds = append(targetFieldIds, field.GetFieldID()) + } + } + + // include all scalar fields with index + if paramtable.Get().DataCoordCfg.DVForceAllIndexReady.GetAsBool() { + indices := mt.indexMeta.GetIndexesForCollection(collection, "") + for _, index := range indices { + targetFieldIds = append(targetFieldIds, index.FieldID) } } segmentIDs := lo.Map(segmentList, func(seg *SegmentInfo, _ int) UniqueID { @@ -114,7 +123,7 @@ func FilterInIndexedSegments(ctx context.Context, handler Handler, mt *meta, ski }) // get indexed segments which finish build index on all vector field - indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, vecFieldIDs) + indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, targetFieldIds) if len(indexed) > 0 { indexedSet := typeutil.NewUniqueSet(indexed...) for _, segment := range segmentList { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index b67cfedb7d..c579f52c8b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4270,6 +4270,7 @@ type dataCoordConfig struct { SegmentFlushInterval ParamItem `refreshable:"true"` BlockingL0EntryNum ParamItem `refreshable:"true"` BlockingL0SizeInMB ParamItem `refreshable:"true"` + DVForceAllIndexReady ParamItem `refreshable:"true"` // compaction EnableCompaction ParamItem `refreshable:"false"` @@ -4567,6 +4568,15 @@ exceeds this threshold, the earliest growing segments will be sealed.`, } p.BlockingL0SizeInMB.Init(base.mgr) + p.DVForceAllIndexReady = ParamItem{ + Key: "dataCoord.dataview.forceAllIndexReady", + Version: "2.6.2", + DefaultValue: "false", + Doc: `If set to true, Milvus will wait all indices ready before the segment appears in indexed dataview.`, + Export: false, + } + p.DVForceAllIndexReady.Init(base.mgr) + p.EnableCompaction = ParamItem{ Key: "dataCoord.enableCompaction", Version: "2.0.0",