diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index c48fa7629e..822cc719da 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -27,7 +27,8 @@ bool IsLoadWithDisk(const char* index_type, int index_engine_version) { - return knowhere::UseDiskLoad(index_type, index_engine_version); + return knowhere::UseDiskLoad(index_type, index_engine_version) || + strcmp(index_type, milvus::index::INVERTED_INDEX_TYPE) == 0; } CStatus diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 12faad22e2..bbb7be66de 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -42,6 +42,7 @@ #include "expr/ITypeExpr.h" #include "plan/PlanNode.h" #include "exec/expression/Expr.h" +#include "segcore/load_index_c.h" namespace chrono = std::chrono; @@ -5272,4 +5273,8 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) { DeleteSearchResult(search_result); DeleteCollection(c_collection); DeleteSegment(segment); -} \ No newline at end of file +} + +TEST(CApiTest, IsLoadWithDisk) { + ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0)); +} diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 9961a7d7fb..08ef4c0a0a 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/contextutil" "github.com/milvus-io/milvus/pkg/util/crypto" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -65,10 +66,10 @@ const ( defaultMaxSearchRequest = 1024 // DefaultArithmeticIndexType name of default index type for scalar field - DefaultArithmeticIndexType = "INVERTED" + DefaultArithmeticIndexType = indexparamcheck.IndexINVERTED // DefaultStringIndexType name of default index type for varChar/string field - DefaultStringIndexType = "INVERTED" + DefaultStringIndexType = indexparamcheck.IndexINVERTED defaultRRFParamsValue = 60 maxRRFParamsValue = 16384 diff --git a/internal/querynodev2/segments/index_attr_cache.go b/internal/querynodev2/segments/index_attr_cache.go index 2e5d95b380..279259f146 100644 --- a/internal/querynodev2/segments/index_attr_cache.go +++ b/internal/querynodev2/segments/index_attr_cache.go @@ -27,6 +27,7 @@ import ( "fmt" "unsafe" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/conc" @@ -54,7 +55,7 @@ func NewIndexAttrCache() *IndexAttrCache { } } -func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64) (memory uint64, disk uint64, err error) { +func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64, fieldBinlog *datapb.FieldBinlog) (memory uint64, disk uint64, err error) { indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams) if err != nil { return 0, 0, fmt.Errorf("index type not exist in index params") @@ -64,6 +65,12 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo neededDiskSize := indexInfo.IndexSize - neededMemSize return uint64(neededMemSize), uint64(neededDiskSize), nil } + if indexType == indexparamcheck.IndexINVERTED { + neededMemSize := 0 + // we will mmap the binlog if the index type is inverted index. + neededDiskSize := indexInfo.IndexSize + getBinlogDataSize(fieldBinlog) + return uint64(neededMemSize), uint64(neededDiskSize), nil + } engineVersion := indexInfo.GetCurrentIndexVersion() isLoadWithDisk, has := c.loadWithDisk.Get(typeutil.NewPair(indexType, engineVersion)) diff --git a/internal/querynodev2/segments/index_attr_cache_test.go b/internal/querynodev2/segments/index_attr_cache_test.go index c8705cfec9..55d3f705bf 100644 --- a/internal/querynodev2/segments/index_attr_cache_test.go +++ b/internal/querynodev2/segments/index_attr_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" @@ -51,7 +52,7 @@ func (s *IndexAttrCacheSuite) TestCacheMissing() { CurrentIndexVersion: 0, } - _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil) s.Require().NoError(err) _, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32]("test", 0)) @@ -67,7 +68,7 @@ func (s *IndexAttrCacheSuite) TestDiskANN() { IndexSize: 100, } - memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil) s.Require().NoError(err) _, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32](indexparamcheck.IndexDISKANN, 0)) @@ -77,6 +78,26 @@ func (s *IndexAttrCacheSuite) TestDiskANN() { s.EqualValues(75, disk) } +func (s *IndexAttrCacheSuite) TestInvertedIndex() { + info := &querypb.FieldIndexInfo{ + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: indexparamcheck.IndexINVERTED}, + }, + CurrentIndexVersion: 0, + IndexSize: 50, + } + binlog := &datapb.FieldBinlog{ + Binlogs: []*datapb.Binlog{ + {LogSize: 60}, + }, + } + + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), binlog) + s.Require().NoError(err) + s.EqualValues(uint64(0), memory) + s.EqualValues(uint64(110), disk) +} + func (s *IndexAttrCacheSuite) TestLoadWithDisk() { info := &querypb.FieldIndexInfo{ IndexParams: []*commonpb.KeyValuePair{ @@ -88,7 +109,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { s.Run("load_with_disk", func() { s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), true) - memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil) s.Require().NoError(err) s.EqualValues(100, memory) @@ -97,7 +118,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { s.Run("load_with_disk", func() { s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), false) - memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil) s.Require().NoError(err) s.Equal(uint64(250), memory) @@ -109,7 +130,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { IndexParams: []*commonpb.KeyValuePair{}, } - _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) + _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil) s.Error(err) }) } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 8e13bf1503..55c17425f3 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1487,7 +1487,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn var mmapEnabled bool if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok { mmapEnabled = isIndexMmapEnable(fieldIndexInfo) - neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor) + neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog) if err != nil { return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d", loadInfo.GetCollectionID(), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 93bc45f515..6ddd9d6bed 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2469,6 +2469,7 @@ Max read concurrency must greater than or equal to 1, and less than or equal to Doc: "memory usage prediction factor for memory index loaded", } p.MemoryIndexLoadPredictMemoryUsageFactor.Init(base.mgr) + p.EnableSegmentPrune = ParamItem{ Key: "queryNode.enableSegmentPrune", Version: "2.3.4",