mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
enhance: make estimate json stats size more accurate (#45875)
#42533 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
dff62c5423
commit
3901f112ae
@ -592,26 +592,31 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context,
|
|||||||
}
|
}
|
||||||
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options)
|
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, options)
|
||||||
|
|
||||||
uploaded, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams)
|
statsResult, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
memorySize := int64(0)
|
|
||||||
for _, file := range uploaded {
|
// calculate log size (disk size) from file sizes
|
||||||
memorySize += file
|
var logSize int64
|
||||||
|
for _, fileSize := range statsResult.Files {
|
||||||
|
logSize += fileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{
|
jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{
|
||||||
FieldID: field.GetFieldID(),
|
FieldID: field.GetFieldID(),
|
||||||
Version: version,
|
Version: version,
|
||||||
BuildID: taskID,
|
BuildID: taskID,
|
||||||
Files: lo.Keys(uploaded),
|
Files: lo.Keys(statsResult.Files),
|
||||||
JsonKeyStatsDataFormat: jsonKeyStatsDataFormat,
|
JsonKeyStatsDataFormat: jsonKeyStatsDataFormat,
|
||||||
MemorySize: memorySize,
|
MemorySize: statsResult.MemSize,
|
||||||
|
LogSize: logSize,
|
||||||
}
|
}
|
||||||
log.Info("field enable json key index, create json key index done",
|
log.Info("field enable json key index, create json key index done",
|
||||||
zap.Int64("field id", field.GetFieldID()),
|
zap.Int64("field id", field.GetFieldID()),
|
||||||
zap.Strings("files", lo.Keys(uploaded)),
|
zap.Strings("files", lo.Keys(statsResult.Files)),
|
||||||
|
zap.Int64("memorySize", statsResult.MemSize),
|
||||||
|
zap.Int64("logSize", logSize),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -155,6 +155,7 @@ type resourceEstimateFactor struct {
|
|||||||
EnableInterminSegmentIndex bool
|
EnableInterminSegmentIndex bool
|
||||||
tempSegmentIndexFactor float64
|
tempSegmentIndexFactor float64
|
||||||
deltaDataExpansionFactor float64
|
deltaDataExpansionFactor float64
|
||||||
|
jsonKeyStatsExpansionFactor float64
|
||||||
TieredEvictionEnabled bool
|
TieredEvictionEnabled bool
|
||||||
TieredEvictableMemoryCacheRatio float64
|
TieredEvictableMemoryCacheRatio float64
|
||||||
TieredEvictableDiskCacheRatio float64
|
TieredEvictableDiskCacheRatio float64
|
||||||
@ -1610,12 +1611,13 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
|||||||
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
|
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
|
||||||
|
|
||||||
maxFactor := resourceEstimateFactor{
|
maxFactor := resourceEstimateFactor{
|
||||||
memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(),
|
memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(),
|
||||||
memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(),
|
memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(),
|
||||||
EnableInterminSegmentIndex: paramtable.Get().QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool(),
|
EnableInterminSegmentIndex: paramtable.Get().QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool(),
|
||||||
tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(),
|
tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(),
|
||||||
deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(),
|
deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(),
|
||||||
TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(),
|
jsonKeyStatsExpansionFactor: paramtable.Get().QueryNodeCfg.JSONKeyStatsExpansionFactor.GetAsFloat(),
|
||||||
|
TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(),
|
||||||
}
|
}
|
||||||
maxSegmentSize := uint64(0)
|
maxSegmentSize := uint64(0)
|
||||||
predictMemUsage := memUsage
|
predictMemUsage := memUsage
|
||||||
@ -2094,6 +2096,21 @@ func estimateLoadingResourceUsageOfSegment(schema *schemapb.CollectionSchema, lo
|
|||||||
segMemoryLoadingSize += uint64(float64(memSize) * expansionFactor)
|
segMemoryLoadingSize += uint64(float64(memSize) * expansionFactor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PART 5: calculate size of json key stats data
|
||||||
|
jsonStatsMmapEnable := paramtable.Get().QueryNodeCfg.MmapJSONStats.GetAsBool()
|
||||||
|
needWarmup := paramtable.Get().QueryNodeCfg.TieredWarmupScalarIndex.GetValue() == "sync"
|
||||||
|
for _, jsonKeyStats := range loadInfo.GetJsonKeyStatsLogs() {
|
||||||
|
if jsonStatsMmapEnable {
|
||||||
|
if !multiplyFactor.TieredEvictionEnabled || needWarmup {
|
||||||
|
segDiskLoadingSize += uint64(float64(jsonKeyStats.GetMemorySize()) * multiplyFactor.jsonKeyStatsExpansionFactor)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !multiplyFactor.TieredEvictionEnabled || needWarmup {
|
||||||
|
segMemoryLoadingSize += uint64(float64(jsonKeyStats.GetMemorySize()) * multiplyFactor.jsonKeyStatsExpansionFactor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &ResourceUsage{
|
return &ResourceUsage{
|
||||||
MemorySize: segMemoryLoadingSize + indexMemorySize,
|
MemorySize: segMemoryLoadingSize + indexMemorySize,
|
||||||
DiskSize: segDiskLoadingSize,
|
DiskSize: segDiskLoadingSize,
|
||||||
|
|||||||
@ -156,7 +156,14 @@ func CreateTextIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexI
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (map[string]int64, error) {
|
type JSONKeyStatsResult struct {
|
||||||
|
// MemSize is the actual memory size when loaded
|
||||||
|
MemSize int64
|
||||||
|
// Files maps file name to file size on disk
|
||||||
|
Files map[string]int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (*JSONKeyStatsResult, error) {
|
||||||
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
|
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
|
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
|
||||||
@ -177,12 +184,17 @@ func CreateJSONKeyStats(ctx context.Context, buildIndexInfo *indexcgopb.BuildInd
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make(map[string]int64)
|
files := make(map[string]int64)
|
||||||
|
var logSize int64
|
||||||
for _, indexInfo := range indexStats.GetSerializedIndexInfos() {
|
for _, indexInfo := range indexStats.GetSerializedIndexInfos() {
|
||||||
res[indexInfo.FileName] = indexInfo.FileSize
|
files[indexInfo.FileName] = indexInfo.FileSize
|
||||||
|
logSize += indexInfo.FileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return &JSONKeyStatsResult{
|
||||||
|
MemSize: indexStats.GetMemSize(),
|
||||||
|
Files: files,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this seems to be used only for test. We should mark the method
|
// TODO: this seems to be used only for test. We should mark the method
|
||||||
|
|||||||
@ -3244,9 +3244,10 @@ type queryNodeConfig struct {
|
|||||||
ForwardBatchSize ParamItem `refreshable:"true"`
|
ForwardBatchSize ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// loader
|
// loader
|
||||||
IoPoolSize ParamItem `refreshable:"false"`
|
IoPoolSize ParamItem `refreshable:"false"`
|
||||||
DeltaDataExpansionRate ParamItem `refreshable:"true"`
|
DeltaDataExpansionRate ParamItem `refreshable:"true"`
|
||||||
DiskSizeFetchInterval ParamItem `refreshable:"false"`
|
JSONKeyStatsExpansionFactor ParamItem `refreshable:"true"`
|
||||||
|
DiskSizeFetchInterval ParamItem `refreshable:"false"`
|
||||||
|
|
||||||
// schedule task policy.
|
// schedule task policy.
|
||||||
SchedulePolicyName ParamItem `refreshable:"false"`
|
SchedulePolicyName ParamItem `refreshable:"false"`
|
||||||
@ -4245,6 +4246,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
|
|||||||
}
|
}
|
||||||
p.DeltaDataExpansionRate.Init(base.mgr)
|
p.DeltaDataExpansionRate.Init(base.mgr)
|
||||||
|
|
||||||
|
p.JSONKeyStatsExpansionFactor = ParamItem{
|
||||||
|
Key: "querynode.JSONKeyStatsExpansionFactor",
|
||||||
|
Version: "2.6.7",
|
||||||
|
DefaultValue: "1.0",
|
||||||
|
Doc: "the expansion factor for json key stats memory size estimation",
|
||||||
|
}
|
||||||
|
p.JSONKeyStatsExpansionFactor.Init(base.mgr)
|
||||||
|
|
||||||
p.DiskSizeFetchInterval = ParamItem{
|
p.DiskSizeFetchInterval = ParamItem{
|
||||||
Key: "querynode.diskSizeFetchInterval",
|
Key: "querynode.diskSizeFetchInterval",
|
||||||
Version: "2.5.0",
|
Version: "2.5.0",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user