diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c8020619df..f5d68a8266 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -354,6 +354,7 @@ queryNode: taskQueueExpire: 60 # 1 min by default, expire time of inner user task queue since queue is empty. enableCrossUserGrouping: false # false by default Enable Cross user grouping when using user-task-polling policy. (close it if task of any user can not merge others). maxPendingTaskPerUser: 1024 # 50 by default, max pending task in scheduler per user. + mmapEnabled: false # enable mmap global, if set true, will use mmap to load segment data # can specify ip for example # ip: 127.0.0.1 diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 383df36d3e..d485a65b90 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -343,7 +343,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, } vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { - return t.FieldID, getIndexType(t.IndexParams) + return t.FieldID, GetIndexType(t.IndexParams) }) vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { if indexType, ok := fieldIndexTypes[field.FieldID]; ok { diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index eb1478fb87..7eb84c28ff 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -255,7 +255,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { return true } indexParams := ib.meta.indexMeta.GetIndexParams(meta.CollectionID, meta.IndexID) - indexType := getIndexType(indexParams) + indexType := GetIndexType(indexParams) if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() { log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID), zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows)) @@ -333,7 +333,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { fieldID := ib.meta.indexMeta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) binlogIDs := getBinLogIds(segment, fieldID) - if isDiskANNIndex(getIndexType(indexParams)) { + if isDiskANNIndex(GetIndexType(indexParams)) { var err error indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams) if err != nil { diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 68dcc0fdc9..a15447de79 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -225,7 +225,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } - if getIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() { + if GetIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() { errMsg := "all IndexNodes do not support disk indexes, please verify" log.Warn(errMsg) err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN) @@ -270,7 +270,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques func ValidateIndexParams(index *model.Index, key, value string) error { switch key { case common.MmapEnabledKey: - indexType := getIndexType(index.IndexParams) + indexType := GetIndexType(index.IndexParams) if !indexparamcheck.IsMmapSupported(indexType) { return merr.WrapErrParameterInvalidMsg("index type %s does not support mmap", indexType) } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index e3add34636..1880a79a8e 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -182,7 +182,7 @@ func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, err return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil } -func getIndexType(indexParams []*commonpb.KeyValuePair) string { +func GetIndexType(indexParams []*commonpb.KeyValuePair) string { for _, param := range indexParams { if param.Key == common.IndexTypeKey { return param.Value diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 1e58d36b00..51721c8f9e 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -32,7 +32,9 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" @@ -73,7 +75,14 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *que indexPaths := indexInfo.IndexFilePaths indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) + enableMmap := common.IsMmapEnabled(indexInfo.IndexParams...) + if !enableMmap { + _, ok := indexParams[common.MmapEnabledKey] + indexType := datacoord.GetIndexType(indexInfo.IndexParams) + indexSupportMmap := indexparamcheck.IsMmapSupported(indexType) + enableMmap = !ok && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool() && indexSupportMmap + } // as Knowhere reports error if encounter a unknown param, we need to delete it delete(indexParams, common.MmapEnabledKey) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 5dc3244728..e6bf7d31b8 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus-storage/go/storage/options" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil" @@ -1074,7 +1075,8 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen opts := opts fieldBinLog := field fieldID := field.FieldID - mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) + mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) || + (!common.FieldHasMmapKey(collection.Schema(), fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool()) if mmapEnabled && options.LoadStatus == LoadStatusInMemory { opts = append(opts, WithLoadStatus(LoadStatusMapped)) } diff --git a/pkg/common/common.go b/pkg/common/common.go index fe0d663120..24e26ff774 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -159,6 +159,20 @@ func IsFieldMmapEnabled(schema *schemapb.CollectionSchema, fieldID int64) bool { return false } +func FieldHasMmapKey(schema *schemapb.CollectionSchema, fieldID int64) bool { + for _, field := range schema.GetFields() { + if field.GetFieldID() == fieldID { + for _, kv := range field.GetTypeParams() { + if kv.Key == MmapEnabledKey { + return true + } + } + return false + } + } + return false +} + func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool { for _, kv := range kvs { if kv.Key == LazyLoadEnableKey && strings.ToLower(kv.Value) == "true" { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index bed201b1af..a19ecf7fd8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1979,6 +1979,7 @@ type queryNodeConfig struct { CacheEnabled ParamItem `refreshable:"false"` CacheMemoryLimit ParamItem `refreshable:"false"` MmapDirPath ParamItem `refreshable:"false"` + MmapEnabled ParamItem `refreshable:"false"` // chunk cache ReadAheadPolicy ParamItem `refreshable:"false"` @@ -2200,6 +2201,14 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.MmapDirPath.Init(base.mgr) + p.MmapEnabled = ParamItem{ + Key: "queryNode.mmapEnabled", + Version: "2.4.0", + DefaultValue: "false", + Doc: "Enable mmap for loading data", + } + p.MmapEnabled.Init(base.mgr) + p.ReadAheadPolicy = ParamItem{ Key: "queryNode.cache.readAheadPolicy", Version: "2.3.2",