diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 9a9b7b0030..e60003f7a3 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -33,6 +33,7 @@ import ( itypeutil "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -332,6 +333,14 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) binlogIDs := getBinLogIds(segment, fieldID) + if isDiskANNIndex(getIndexType(indexParams)) { + var err error + indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams) + if err != nil { + log.Ctx(ib.ctx).Warn("failed to append index build params", zap.Int64("buildID", buildID), + zap.Int64("nodeID", nodeID), zap.Error(err)) + } + } var req *indexpb.CreateJobRequest if Params.CommonCfg.EnableStorageV2.GetAsBool() { collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID()) diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index de2c612fd7..16f2fcd8a1 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -199,6 +199,10 @@ func isOptionalScalarFieldSupported(indexType string) bool { return indexType == indexparamcheck.IndexHNSW } +func isDiskANNIndex(indexType string) bool { + return indexType == indexparamcheck.IndexDISKANN +} + func parseBuildIDFromFilePath(key string) (UniqueID, error) { ss := strings.Split(key, "/") if strings.HasSuffix(key, "/") { diff --git a/pkg/util/indexparams/index_params.go b/pkg/util/indexparams/index_params.go index 978a8b525c..4e2281e0b5 100644 --- a/pkg/util/indexparams/index_params.go +++ b/pkg/util/indexparams/index_params.go @@ -22,6 +22,7 @@ import ( "strconv" "unsafe" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -164,11 +165,11 @@ func NewBigDataExtraParamsFromMap(value map[string]string) (*BigDataIndexExtraPa // FillDiskIndexParams fill ratio params to index param on proxy node // Which will be used to calculate build and load params func FillDiskIndexParams(params *paramtable.ComponentParam, indexParams map[string]string) error { - maxDegree := params.CommonCfg.MaxDegree.GetValue() - searchListSize := params.CommonCfg.SearchListSize.GetValue() - pqCodeBudgetGBRatio := params.CommonCfg.PQCodeBudgetGBRatio.GetValue() - buildNumThreadsRatio := params.CommonCfg.BuildNumThreadsRatio.GetValue() - searchCacheBudgetGBRatio := params.CommonCfg.SearchCacheBudgetGBRatio.GetValue() + var maxDegree string + var searchListSize string + var pqCodeBudgetGBRatio string + var buildNumThreadsRatio string + var searchCacheBudgetGBRatio string if params.AutoIndexConfig.Enable.GetAsBool() { indexParams := params.AutoIndexConfig.IndexParams.GetAsJSONMap() @@ -187,6 +188,13 @@ func FillDiskIndexParams(params *paramtable.ComponentParam, indexParams map[stri } pqCodeBudgetGBRatio = fmt.Sprintf("%f", extraParams.PQCodeBudgetGBRatio) buildNumThreadsRatio = fmt.Sprintf("%f", extraParams.BuildNumThreadsRatio) + searchCacheBudgetGBRatio = fmt.Sprintf("%f", extraParams.SearchCacheBudgetGBRatio) + } else { + maxDegree = params.CommonCfg.MaxDegree.GetValue() + searchListSize = params.CommonCfg.SearchListSize.GetValue() + pqCodeBudgetGBRatio = params.CommonCfg.PQCodeBudgetGBRatio.GetValue() + buildNumThreadsRatio = params.CommonCfg.BuildNumThreadsRatio.GetValue() + searchCacheBudgetGBRatio = params.CommonCfg.SearchCacheBudgetGBRatio.GetValue() } indexParams[MaxDegreeKey] = maxDegree @@ -198,6 +206,63 @@ func FillDiskIndexParams(params *paramtable.ComponentParam, indexParams map[stri return nil } +func GetIndexParams(indexParams []*commonpb.KeyValuePair, key string) string { + for _, param := range indexParams { + if param.Key == key { + return param.Value + } + } + return "" +} + +// UpdateDiskIndexBuildParams update index params for `buildIndex` (override search cache size in `CreateIndex`) +func UpdateDiskIndexBuildParams(params *paramtable.ComponentParam, indexParams []*commonpb.KeyValuePair) ([]*commonpb.KeyValuePair, error) { + existedVal := GetIndexParams(indexParams, SearchCacheBudgetRatioKey) + + var searchCacheBudgetGBRatio string + if params.AutoIndexConfig.Enable.GetAsBool() { + extraParams, err := NewBigDataExtraParamsFromJSON(params.AutoIndexConfig.ExtraParams.GetValue()) + if err != nil { + return indexParams, fmt.Errorf("index param search_cache_budget_gb_ratio not exist in AutoIndex Config") + } + searchCacheBudgetGBRatio = fmt.Sprintf("%f", extraParams.SearchCacheBudgetGBRatio) + } else { + paramVal, err := strconv.ParseFloat(params.CommonCfg.SearchCacheBudgetGBRatio.GetValue(), 64) + if err != nil { + return indexParams, fmt.Errorf("index param search_cache_budget_gb_ratio not exist in Config") + } + searchCacheBudgetGBRatio = fmt.Sprintf("%f", paramVal) + } + + // append when not exist + if len(existedVal) == 0 { + indexParams = append(indexParams, + &commonpb.KeyValuePair{ + Key: SearchCacheBudgetRatioKey, + Value: searchCacheBudgetGBRatio, + }) + return indexParams, nil + } + // override when exist + updatedParams := make([]*commonpb.KeyValuePair, 0, len(indexParams)) + for _, param := range indexParams { + if param.Key == SearchCacheBudgetRatioKey { + updatedParams = append(updatedParams, + &commonpb.KeyValuePair{ + Key: SearchCacheBudgetRatioKey, + Value: searchCacheBudgetGBRatio, + }) + } else { + updatedParams = append(updatedParams, + &commonpb.KeyValuePair{ + Key: param.Key, + Value: param.Value, + }) + } + } + return updatedParams, nil +} + // SetDiskIndexBuildParams set index build params with ratio params on indexNode // IndexNode cal build param with ratio params and cpu count, memory count... func SetDiskIndexBuildParams(indexParams map[string]string, fieldDataSize int64) error { diff --git a/pkg/util/indexparams/index_params_test.go b/pkg/util/indexparams/index_params_test.go index 1bff36d5c8..9b9030ff30 100644 --- a/pkg/util/indexparams/index_params_test.go +++ b/pkg/util/indexparams/index_params_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -124,6 +125,100 @@ func TestDiskIndexParams(t *testing.T) { assert.Error(t, err) }) + t.Run("patch index build params", func(t *testing.T) { + var params paramtable.ComponentParam + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) + + indexParams := make([]*commonpb.KeyValuePair, 0, 3) + + indexParams = append(indexParams, + &commonpb.KeyValuePair{ + Key: PQCodeBudgetRatioKey, + Value: "0.125", + }) + + indexParams = append(indexParams, + &commonpb.KeyValuePair{ + Key: NumBuildThreadRatioKey, + Value: "1.0", + }) + + indexParams = append(indexParams, + &commonpb.KeyValuePair{ + Key: BeamWidthRatioKey, + Value: "4.0", + }) + + indexParams, err := UpdateDiskIndexBuildParams(¶ms, indexParams) + assert.NoError(t, err) + assert.True(t, len(indexParams) == 4) + + val := GetIndexParams(indexParams, SearchCacheBudgetRatioKey) + cfgVal, cfgErr := strconv.ParseFloat(params.CommonCfg.SearchCacheBudgetGBRatio.GetValue(), 64) + assert.NoError(t, cfgErr) + iVal, iErr := strconv.ParseFloat(val, 64) + assert.NoError(t, iErr) + assert.Equal(t, cfgVal, iVal) + + params.Save(params.AutoIndexConfig.Enable.Key, "true") + + jsonStr := ` + { + "build_ratio": "{\"pq_code_budget_gb\": 0.125, \"num_threads\": 1}", + "prepare_ratio": "{\"search_cache_budget_gb\": 0.225, \"num_threads\": 8}", + "beamwidth_ratio": "8.0" + } + ` + params.Save(params.AutoIndexConfig.ExtraParams.Key, jsonStr) + + autoParams := make([]*commonpb.KeyValuePair, 0, 3) + + autoParams = append(autoParams, + &commonpb.KeyValuePair{ + Key: PQCodeBudgetRatioKey, + Value: "0.125", + }) + + autoParams = append(autoParams, + &commonpb.KeyValuePair{ + Key: NumBuildThreadRatioKey, + Value: "1.0", + }) + + autoParams = append(autoParams, + &commonpb.KeyValuePair{ + Key: BeamWidthRatioKey, + Value: "4.0", + }) + + autoParams, err = UpdateDiskIndexBuildParams(¶ms, autoParams) + assert.NoError(t, err) + assert.True(t, len(autoParams) == 4) + + val = GetIndexParams(autoParams, SearchCacheBudgetRatioKey) + iVal, iErr = strconv.ParseFloat(val, 64) + assert.NoError(t, iErr) + assert.Equal(t, 0.225, iVal) + + newJSONStr := ` + { + "build_ratio": "{\"pq_code_budget_gb\": 0.125, \"num_threads\": 1}", + "prepare_ratio": "{\"search_cache_budget_gb\": 0.325, \"num_threads\": 8}", + "beamwidth_ratio": "8.0" + } + ` + params.Save(params.AutoIndexConfig.ExtraParams.Key, newJSONStr) + autoParams, err = UpdateDiskIndexBuildParams(¶ms, autoParams) + + assert.NoError(t, err) + assert.True(t, len(autoParams) == 4) + + val = GetIndexParams(autoParams, SearchCacheBudgetRatioKey) + iVal, iErr = strconv.ParseFloat(val, 64) + assert.NoError(t, iErr) + assert.Equal(t, 0.325, iVal) + }) + t.Run("set disk index build params", func(t *testing.T) { indexParams := make(map[string]string) indexParams[PQCodeBudgetRatioKey] = "0.125"