diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index fc28d2915a..2f56290dd0 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -592,26 +592,31 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context, } buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options) - uploaded, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams) + statsResult, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams) if err != nil { return err } - memorySize := int64(0) - for _, file := range uploaded { - memorySize += file + + // calculate log size (disk size) from file sizes + var logSize int64 + for _, fileSize := range statsResult.Files { + logSize += fileSize } jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{ FieldID: field.GetFieldID(), Version: version, BuildID: taskID, - Files: lo.Keys(uploaded), + Files: lo.Keys(statsResult.Files), JsonKeyStatsDataFormat: jsonKeyStatsDataFormat, - MemorySize: memorySize, + MemorySize: statsResult.MemSize, + LogSize: logSize, } log.Info("field enable json key index, create json key index done", zap.Int64("field id", field.GetFieldID()), - zap.Strings("files", lo.Keys(uploaded)), + zap.Strings("files", lo.Keys(statsResult.Files)), + zap.Int64("memorySize", statsResult.MemSize), + zap.Int64("logSize", logSize), ) } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 675075f583..0c3b64d25f 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -155,6 +155,7 @@ type resourceEstimateFactor struct { EnableInterminSegmentIndex bool tempSegmentIndexFactor float64 deltaDataExpansionFactor float64 + jsonKeyStatsExpansionFactor float64 TieredEvictionEnabled bool TieredEvictableMemoryCacheRatio float64 TieredEvictableDiskCacheRatio float64 @@ -1610,12 +1611,13 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize maxFactor := resourceEstimateFactor{ - memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(), - memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), - EnableInterminSegmentIndex: paramtable.Get().QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool(), - tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(), - deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), - TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(), + memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(), + memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), + EnableInterminSegmentIndex: paramtable.Get().QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool(), + tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(), + deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + jsonKeyStatsExpansionFactor: paramtable.Get().QueryNodeCfg.JSONKeyStatsExpansionFactor.GetAsFloat(), + TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(), } maxSegmentSize := uint64(0) predictMemUsage := memUsage @@ -2094,6 +2096,21 @@ func estimateLoadingResourceUsageOfSegment(schema *schemapb.CollectionSchema, lo segMemoryLoadingSize += uint64(float64(memSize) * expansionFactor) } + // PART 5: calculate size of json key stats data + jsonStatsMmapEnable := paramtable.Get().QueryNodeCfg.MmapJSONStats.GetAsBool() + needWarmup := paramtable.Get().QueryNodeCfg.TieredWarmupScalarIndex.GetValue() == "sync" + for _, jsonKeyStats := range loadInfo.GetJsonKeyStatsLogs() { + if jsonStatsMmapEnable { + if !multiplyFactor.TieredEvictionEnabled || needWarmup { + segDiskLoadingSize += uint64(float64(jsonKeyStats.GetMemorySize()) * multiplyFactor.jsonKeyStatsExpansionFactor) + } + } else { + if !multiplyFactor.TieredEvictionEnabled || needWarmup { + segMemoryLoadingSize += uint64(float64(jsonKeyStats.GetMemorySize()) * multiplyFactor.jsonKeyStatsExpansionFactor) + } + } + } + return &ResourceUsage{ MemorySize: segMemoryLoadingSize + indexMemorySize, DiskSize: segDiskLoadingSize, diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index 304c0c5770..03da27b346 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -156,7 +156,14 @@ func CreateTextIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexI return res, nil } -func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (map[string]int64, error) { +type JSONKeyStatsResult struct { + // MemSize is the actual memory size when loaded + MemSize int64 + // Files maps file name to file size on disk + Files map[string]int64 +} + +func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (*JSONKeyStatsResult, error) { buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo) if err != nil { log.Ctx(ctx).Warn("marshal buildIndexInfo failed", @@ -177,12 +184,17 @@ func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildInd return nil, err } - res := make(map[string]int64) + files := make(map[string]int64) + var logSize int64 for _, indexInfo := range indexStats.GetSerializedIndexInfos() { - res[indexInfo.FileName] = indexInfo.FileSize + files[indexInfo.FileName] = indexInfo.FileSize + logSize += indexInfo.FileSize } - return res, nil + return &JSONKeyStatsResult{ + MemSize: indexStats.GetMemSize(), + Files: files, + }, nil } // TODO: this seems to be used only for test. We should mark the method diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index ec9cbbdade..29fe4aa88f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3244,9 +3244,10 @@ type queryNodeConfig struct { ForwardBatchSize ParamItem `refreshable:"true"` // loader - IoPoolSize ParamItem `refreshable:"false"` - DeltaDataExpansionRate ParamItem `refreshable:"true"` - DiskSizeFetchInterval ParamItem `refreshable:"false"` + IoPoolSize ParamItem `refreshable:"false"` + DeltaDataExpansionRate ParamItem `refreshable:"true"` + JSONKeyStatsExpansionFactor ParamItem `refreshable:"true"` + DiskSizeFetchInterval ParamItem `refreshable:"false"` // schedule task policy. SchedulePolicyName ParamItem `refreshable:"false"` @@ -4245,6 +4246,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeltaDataExpansionRate.Init(base.mgr) + p.JSONKeyStatsExpansionFactor = ParamItem{ + Key: "querynode.JSONKeyStatsExpansionFactor", + Version: "2.6.7", + DefaultValue: "1.0", + Doc: "the expansion factor for json key stats memory size estimation", + } + p.JSONKeyStatsExpansionFactor.Init(base.mgr) + p.DiskSizeFetchInterval = ParamItem{ Key: "querynode.diskSizeFetchInterval", Version: "2.5.0",