diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fa43ae0963..4e2be888f8 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -735,6 +735,7 @@ dataCoord: mixCompactionUsage: 4 # slot usage of mix compaction task. l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task. indexTaskSlotUsage: 64 # slot usage of index task per 512mb + scalarIndexTaskSlotUsage: 16 # slot usage of scalar index task per 512mb statsTaskSlotUsage: 8 # slot usage of stats task per 512mb analyzeTaskSlotUsage: 65535 # slot usage of analyze task jsonShreddingTriggerCount: 10 # jsonkey stats task count per trigger diff --git a/internal/datacoord/index_inspector.go b/internal/datacoord/index_inspector.go index 889b0adae1..a619489d20 100644 --- a/internal/datacoord/index_inspector.go +++ b/internal/datacoord/index_inspector.go @@ -175,13 +175,14 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg if err != nil { return err } - taskSlot := calculateIndexTaskSlot(segment.getSegmentSize()) indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID) indexType := GetIndexType(indexParams) + isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) + taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex) // rewrite the index type if needed, and this final index type will be persisted in the meta - if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() { + if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() { var err error indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams) if err != nil { @@ -227,9 +228,14 @@ func (i *indexInspector) reloadFromMeta() { continue } + indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID) + indexType := GetIndexType(indexParams) + isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) + taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex) + i.scheduler.Enqueue(newIndexBuildTask( model.CloneSegmentIndex(segIndex), - calculateIndexTaskSlot(segment.getSegmentSize()), + taskSlot, i.meta, i.handler, i.storageCli, diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index be0024142f..f9f359c6b9 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -369,8 +369,11 @@ func getSortStatus(sorted bool) string { return "unsorted" } -func calculateIndexTaskSlot(segmentSize int64) int64 { +func calculateIndexTaskSlot(segmentSize int64, isVectorIndex bool) int64 { defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64() + if !isVectorIndex { + defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64() + } if segmentSize > 512*1024*1024 { taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots return max(taskSlot, 1) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 073f9e6533..f95c83803d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4617,6 +4617,7 @@ type dataCoordConfig struct { MixCompactionSlotUsage ParamItem `refreshable:"true"` L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` IndexTaskSlotUsage ParamItem `refreshable:"true"` + ScalarIndexTaskSlotUsage ParamItem `refreshable:"true"` StatsTaskSlotUsage ParamItem `refreshable:"true"` AnalyzeTaskSlotUsage ParamItem `refreshable:"true"` @@ -5634,6 +5635,16 @@ if param targetVecIndexVersion is not set, the default value is -1, which means } p.IndexTaskSlotUsage.Init(base.mgr) + p.ScalarIndexTaskSlotUsage = ParamItem{ + Key: "dataCoord.slot.scalarIndexTaskSlotUsage", + Version: "2.6.8", + Doc: "slot usage of scalar index task per 512mb", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.ScalarIndexTaskSlotUsage.Init(base.mgr) + p.StatsTaskSlotUsage = ParamItem{ Key: "dataCoord.slot.statsTaskSlotUsage", Version: "2.5.8",