From dd957cf9e39bf6516b628ba70ea775a7ce9ff980 Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 1 Mar 2024 15:23:00 +0800 Subject: [PATCH] enhance: add configurable memory index load predict memory usage factor (#30561) related pr: https://github.com/milvus-io/milvus/pull/30475 Signed-off-by: chyezh --- internal/querynodev2/segments/index_attr_cache.go | 7 ++++--- .../querynodev2/segments/index_attr_cache_test.go | 9 +++++++-- pkg/util/paramtable/component_param.go | 11 ++++++++++- pkg/util/paramtable/component_param_test.go | 4 ++++ tests/integration/minicluster_v2.go | 2 +- tests/integration/querynode/querynode_test.go | 2 ++ 6 files changed, 28 insertions(+), 7 deletions(-) diff --git a/internal/querynodev2/segments/index_attr_cache.go b/internal/querynodev2/segments/index_attr_cache.go index 1beb575dc3..df0a1667ce 100644 --- a/internal/querynodev2/segments/index_attr_cache.go +++ b/internal/querynodev2/segments/index_attr_cache.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -75,13 +76,13 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo }) } - factor := uint64(1) + factor := float64(1) diskUsage := uint64(0) if !isLoadWithDisk { - factor = 2 + factor = paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat() } else { diskUsage = uint64(indexInfo.IndexSize) } - return uint64(indexInfo.IndexSize) * factor, diskUsage, nil + return uint64(float64(indexInfo.IndexSize) * factor), diskUsage, nil } diff --git a/internal/querynodev2/segments/index_attr_cache_test.go b/internal/querynodev2/segments/index_attr_cache_test.go index d5fba19d9c..9ded2e0377 100644 --- a/internal/querynodev2/segments/index_attr_cache_test.go +++ b/internal/querynodev2/segments/index_attr_cache_test.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -34,6 +35,10 @@ type IndexAttrCacheSuite struct { c *IndexAttrCache } +func (s *IndexAttrCacheSuite) SetupSuite() { + paramtable.Init() +} + func (s *IndexAttrCacheSuite) SetupTest() { s.c = NewIndexAttrCache() } @@ -95,8 +100,8 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { memory, disk, err := s.c.GetIndexResourceUsage(info) s.Require().NoError(err) - s.EqualValues(200, memory) - s.EqualValues(0, disk) + s.Equal(uint64(250), memory) + s.Equal(uint64(0), disk) }) s.Run("corrupted_index_info", func() { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 96168e6089..850feef4d3 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1981,6 +1981,8 @@ type queryNodeConfig struct { CleanExcludeSegInterval ParamItem `refreshable:"false"` FlowGraphMaxQueueLength ParamItem `refreshable:"false"` FlowGraphMaxParallelism ParamItem `refreshable:"false"` + + MemoryIndexLoadPredictMemoryUsageFactor ParamItem `refreshable:"true"` } func (p *queryNodeConfig) init(base *BaseTable) { @@ -2426,7 +2428,6 @@ Max read concurrency must greater than or equal to 1, and less than or equal to DefaultValue: "8192", Doc: "expr eval batch size for getnext interface", } - p.ExprEvalBatchSize.Init(base.mgr) p.CleanExcludeSegInterval = ParamItem{ @@ -2437,6 +2438,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to Export: true, } p.CleanExcludeSegInterval.Init(base.mgr) + + p.MemoryIndexLoadPredictMemoryUsageFactor = ParamItem{ + Key: "queryNode.memoryIndexLoadPredictMemoryUsageFactor", + Version: "2.3.8", + DefaultValue: "2.5", // HNSW index needs more memory to load. + Doc: "memory usage prediction factor for memory index loaded", + } + p.MemoryIndexLoadPredictMemoryUsageFactor.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index fd5dfd0172..e3b8e38818 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -361,6 +361,10 @@ func TestComponentParam(t *testing.T) { params.Save("querynode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + + assert.Equal(t, 2.5, Params.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + params.Save("queryNode.memoryIndexLoadPredictMemoryUsageFactor", "2.0") + assert.Equal(t, 2.0, Params.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) }) t.Run("test dataCoordConfig", func(t *testing.T) { diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index 3b3b74a471..818686dd83 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -76,7 +76,7 @@ func DefaultParams() map[string]string { params.LocalStorageCfg.Path.Key: path.Join("/tmp", testPath), params.CommonCfg.StorageType.Key: "local", params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs - params.CommonCfg.GracefulStopTimeout.Key: "10", + params.CommonCfg.GracefulStopTimeout.Key: "30", } } diff --git a/tests/integration/querynode/querynode_test.go b/tests/integration/querynode/querynode_test.go index 558b632182..420076d9bf 100644 --- a/tests/integration/querynode/querynode_test.go +++ b/tests/integration/querynode/querynode_test.go @@ -27,6 +27,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" @@ -182,6 +183,7 @@ func (s *QueryNodeSuite) search(collectionName string, dim int) { } queryResult, err := c.Proxy.Query(context.TODO(), queryReq) s.NoError(err) + s.Equal(queryResult.Status.ErrorCode, commonpb.ErrorCode_Success) s.Equal(len(queryResult.FieldsData), 1) numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0] s.Equal(numEntities, int64(s.rowsPerCollection))