enhance: [2.6] Estimate the taskSlot based on whether scalar or vector index (#45851)

issue: #45186 
master pr: #45850

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-12-04 15:21:11 +08:00 committed by GitHub
parent 397c3edd06
commit 161676aed8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 25 additions and 4 deletions

View File

@ -730,6 +730,7 @@ dataCoord:
mixCompactionUsage: 4 # slot usage of mix compaction task. mixCompactionUsage: 4 # slot usage of mix compaction task.
l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task. l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task.
indexTaskSlotUsage: 64 # slot usage of index task per 512mb indexTaskSlotUsage: 64 # slot usage of index task per 512mb
scalarIndexTaskSlotUsage: 16 # slot usage of scalar index task per 512mb
statsTaskSlotUsage: 8 # slot usage of stats task per 512mb statsTaskSlotUsage: 8 # slot usage of stats task per 512mb
analyzeTaskSlotUsage: 65535 # slot usage of analyze task analyzeTaskSlotUsage: 65535 # slot usage of analyze task
jsonShreddingTriggerCount: 10 # jsonkey stats task count per trigger jsonShreddingTriggerCount: 10 # jsonkey stats task count per trigger

View File

@ -175,13 +175,14 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
if err != nil { if err != nil {
return err return err
} }
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize())
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID) indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams) indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
// rewrite the index type if needed, and this final index type will be persisted in the meta // rewrite the index type if needed, and this final index type will be persisted in the meta
if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() { if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
var err error var err error
indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams) indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams)
if err != nil { if err != nil {
@ -227,9 +228,14 @@ func (i *indexInspector) reloadFromMeta() {
continue continue
} }
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
i.scheduler.Enqueue(newIndexBuildTask( i.scheduler.Enqueue(newIndexBuildTask(
model.CloneSegmentIndex(segIndex), model.CloneSegmentIndex(segIndex),
calculateIndexTaskSlot(segment.getSegmentSize()), taskSlot,
i.meta, i.meta,
i.handler, i.handler,
i.storageCli, i.storageCli,

View File

@ -369,8 +369,11 @@ func getSortStatus(sorted bool) string {
return "unsorted" return "unsorted"
} }
func calculateIndexTaskSlot(segmentSize int64) int64 { func calculateIndexTaskSlot(segmentSize int64, isVectorIndex bool) int64 {
defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64() defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()
if !isVectorIndex {
defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64()
}
if segmentSize > 512*1024*1024 { if segmentSize > 512*1024*1024 {
taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots
return max(taskSlot, 1) return max(taskSlot, 1)

View File

@ -4587,6 +4587,7 @@ type dataCoordConfig struct {
MixCompactionSlotUsage ParamItem `refreshable:"true"` MixCompactionSlotUsage ParamItem `refreshable:"true"`
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`
IndexTaskSlotUsage ParamItem `refreshable:"true"` IndexTaskSlotUsage ParamItem `refreshable:"true"`
ScalarIndexTaskSlotUsage ParamItem `refreshable:"true"`
StatsTaskSlotUsage ParamItem `refreshable:"true"` StatsTaskSlotUsage ParamItem `refreshable:"true"`
AnalyzeTaskSlotUsage ParamItem `refreshable:"true"` AnalyzeTaskSlotUsage ParamItem `refreshable:"true"`
@ -5604,6 +5605,16 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
} }
p.IndexTaskSlotUsage.Init(base.mgr) p.IndexTaskSlotUsage.Init(base.mgr)
p.ScalarIndexTaskSlotUsage = ParamItem{
Key: "dataCoord.slot.scalarIndexTaskSlotUsage",
Version: "2.6.8",
Doc: "slot usage of scalar index task per 512mb",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.ScalarIndexTaskSlotUsage.Init(base.mgr)
p.StatsTaskSlotUsage = ParamItem{ p.StatsTaskSlotUsage = ParamItem{
Key: "dataCoord.slot.statsTaskSlotUsage", Key: "dataCoord.slot.statsTaskSlotUsage",
Version: "2.5.8", Version: "2.5.8",