feat: Add global mmap enable configuration (#31267)

https://github.com/milvus-io/milvus/issues/31279

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2024-03-18 15:17:10 +08:00 committed by GitHub
parent 243e311515
commit bdc70dfc6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 42 additions and 7 deletions

View File

@ -354,6 +354,7 @@ queryNode:
taskQueueExpire: 60 # 1 min by default, expire time of inner user task queue since queue is empty. taskQueueExpire: 60 # 1 min by default, expire time of inner user task queue since queue is empty.
enableCrossUserGrouping: false # false by default Enable Cross user grouping when using user-task-polling policy. (close it if task of any user can not merge others). enableCrossUserGrouping: false # false by default Enable Cross user grouping when using user-task-polling policy. (close it if task of any user can not merge others).
maxPendingTaskPerUser: 1024 # 50 by default, max pending task in scheduler per user. maxPendingTaskPerUser: 1024 # 50 by default, max pending task in scheduler per user.
mmapEnabled: false # enable mmap global, if set true, will use mmap to load segment data
# can specify ip for example # can specify ip for example
# ip: 127.0.0.1 # ip: 127.0.0.1

View File

@ -343,7 +343,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
} }
vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema)
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
return t.FieldID, getIndexType(t.IndexParams) return t.FieldID, GetIndexType(t.IndexParams)
}) })
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok { if indexType, ok := fieldIndexTypes[field.FieldID]; ok {

View File

@ -255,7 +255,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
return true return true
} }
indexParams := ib.meta.indexMeta.GetIndexParams(meta.CollectionID, meta.IndexID) indexParams := ib.meta.indexMeta.GetIndexParams(meta.CollectionID, meta.IndexID)
indexType := getIndexType(indexParams) indexType := GetIndexType(indexParams)
if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() { if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID), log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows)) zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
@ -333,7 +333,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
fieldID := ib.meta.indexMeta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) fieldID := ib.meta.indexMeta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
binlogIDs := getBinLogIds(segment, fieldID) binlogIDs := getBinLogIds(segment, fieldID)
if isDiskANNIndex(getIndexType(indexParams)) { if isDiskANNIndex(GetIndexType(indexParams)) {
var err error var err error
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams) indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
if err != nil { if err != nil {

View File

@ -225,7 +225,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil return merr.Status(err), nil
} }
if getIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() { if GetIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify" errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg) log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN) err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN)
@ -270,7 +270,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
func ValidateIndexParams(index *model.Index, key, value string) error { func ValidateIndexParams(index *model.Index, key, value string) error {
switch key { switch key {
case common.MmapEnabledKey: case common.MmapEnabledKey:
indexType := getIndexType(index.IndexParams) indexType := GetIndexType(index.IndexParams)
if !indexparamcheck.IsMmapSupported(indexType) { if !indexparamcheck.IsMmapSupported(indexType) {
return merr.WrapErrParameterInvalidMsg("index type %s does not support mmap", indexType) return merr.WrapErrParameterInvalidMsg("index type %s does not support mmap", indexType)
} }

View File

@ -182,7 +182,7 @@ func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, err
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
} }
func getIndexType(indexParams []*commonpb.KeyValuePair) string { func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
for _, param := range indexParams { for _, param := range indexParams {
if param.Key == common.IndexTypeKey { if param.Key == common.IndexTypeKey {
return param.Value return param.Value

View File

@ -32,7 +32,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparamcheck"
@ -73,7 +75,14 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *que
indexPaths := indexInfo.IndexFilePaths indexPaths := indexInfo.IndexFilePaths
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
enableMmap := common.IsMmapEnabled(indexInfo.IndexParams...) enableMmap := common.IsMmapEnabled(indexInfo.IndexParams...)
if !enableMmap {
_, ok := indexParams[common.MmapEnabledKey]
indexType := datacoord.GetIndexType(indexInfo.IndexParams)
indexSupportMmap := indexparamcheck.IsMmapSupported(indexType)
enableMmap = !ok && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool() && indexSupportMmap
}
// as Knowhere reports error if encounter a unknown param, we need to delete it // as Knowhere reports error if encounter a unknown param, we need to delete it
delete(indexParams, common.MmapEnabledKey) delete(indexParams, common.MmapEnabledKey)

View File

@ -46,6 +46,7 @@ import (
"github.com/milvus-io/milvus-storage/go/storage/options" "github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil" typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
@ -1074,7 +1075,8 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen
opts := opts opts := opts
fieldBinLog := field fieldBinLog := field
fieldID := field.FieldID fieldID := field.FieldID
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) ||
(!common.FieldHasMmapKey(collection.Schema(), fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool())
if mmapEnabled && options.LoadStatus == LoadStatusInMemory { if mmapEnabled && options.LoadStatus == LoadStatusInMemory {
opts = append(opts, WithLoadStatus(LoadStatusMapped)) opts = append(opts, WithLoadStatus(LoadStatusMapped))
} }

View File

@ -159,6 +159,20 @@ func IsFieldMmapEnabled(schema *schemapb.CollectionSchema, fieldID int64) bool {
return false return false
} }
func FieldHasMmapKey(schema *schemapb.CollectionSchema, fieldID int64) bool {
for _, field := range schema.GetFields() {
if field.GetFieldID() == fieldID {
for _, kv := range field.GetTypeParams() {
if kv.Key == MmapEnabledKey {
return true
}
}
return false
}
}
return false
}
func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool { func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool {
for _, kv := range kvs { for _, kv := range kvs {
if kv.Key == LazyLoadEnableKey && strings.ToLower(kv.Value) == "true" { if kv.Key == LazyLoadEnableKey && strings.ToLower(kv.Value) == "true" {

View File

@ -1979,6 +1979,7 @@ type queryNodeConfig struct {
CacheEnabled ParamItem `refreshable:"false"` CacheEnabled ParamItem `refreshable:"false"`
CacheMemoryLimit ParamItem `refreshable:"false"` CacheMemoryLimit ParamItem `refreshable:"false"`
MmapDirPath ParamItem `refreshable:"false"` MmapDirPath ParamItem `refreshable:"false"`
MmapEnabled ParamItem `refreshable:"false"`
// chunk cache // chunk cache
ReadAheadPolicy ParamItem `refreshable:"false"` ReadAheadPolicy ParamItem `refreshable:"false"`
@ -2200,6 +2201,14 @@ func (p *queryNodeConfig) init(base *BaseTable) {
} }
p.MmapDirPath.Init(base.mgr) p.MmapDirPath.Init(base.mgr)
p.MmapEnabled = ParamItem{
Key: "queryNode.mmapEnabled",
Version: "2.4.0",
DefaultValue: "false",
Doc: "Enable mmap for loading data",
}
p.MmapEnabled.Init(base.mgr)
p.ReadAheadPolicy = ParamItem{ p.ReadAheadPolicy = ParamItem{
Key: "queryNode.cache.readAheadPolicy", Key: "queryNode.cache.readAheadPolicy",
Version: "2.3.2", Version: "2.3.2",