From 9c55a7f4229aa7ae22aee7feee646bbf0aa345db Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 1 Aug 2023 21:47:06 +0800 Subject: [PATCH] Add worker num as one of load resource (#26045) Signed-off-by: yah01 --- .../querynodev2/segments/segment_loader.go | 198 +++++++----------- pkg/util/merr/errors.go | 1 + pkg/util/merr/errors_test.go | 1 + pkg/util/merr/utils.go | 8 + pkg/util/paramtable/param_item.go | 16 +- 5 files changed, 96 insertions(+), 128 deletions(-) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 58dd3855dc..6767cbe656 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -32,7 +32,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -41,8 +40,6 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" @@ -50,7 +47,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" - . "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -75,6 +71,24 @@ type Loader interface { LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error } +type LoadResource struct { + MemorySize uint64 + DiskSize uint64 + WorkNum int +} + +func (r *LoadResource) Add(resource LoadResource) { + r.MemorySize += resource.MemorySize + r.DiskSize += resource.DiskSize + r.WorkNum += resource.WorkNum +} + +func (r *LoadResource) Sub(resource LoadResource) { + r.MemorySize -= resource.MemorySize + r.DiskSize -= resource.DiskSize + r.WorkNum -= resource.WorkNum +} + func NewLoader( manager *Manager, cm storage.ChunkManager, @@ -94,14 +108,11 @@ func NewLoader( ioPoolSize = configPoolSize } - ioPool := conc.NewPool[*storage.Blob](ioPoolSize, conc.WithPreAlloc(true)) - log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) loader := &segmentLoader{ manager: manager, cm: cm, - ioPool: ioPool, loadingSegments: typeutil.NewConcurrentMap[int64, chan struct{}](), } @@ -112,13 +123,11 @@ func NewLoader( type segmentLoader struct { manager *Manager cm storage.ChunkManager - ioPool *conc.Pool[*storage.Blob] mut sync.Mutex // The channel will be closed as the segment loaded loadingSegments *typeutil.ConcurrentMap[int64, chan struct{}] - committedMemSize uint64 - committedDiskSize uint64 + committedResource LoadResource } var _ Loader = (*segmentLoader)(nil) @@ -146,11 +155,11 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos))) // Check memory & storage limit - memUsage, diskUsage, concurrencyLevel, err := loader.requestResource(ctx, infos...) + resource, concurrencyLevel, err := loader.requestResource(ctx, infos...) if err != nil { return nil, err } - defer loader.freeRequest(memUsage, diskUsage) + defer loader.freeRequest(resource) newSegments := make(map[int64]*LocalSegment, len(infos)) clearAll := func() { @@ -287,53 +296,68 @@ func (loader *segmentLoader) notifyLoadFinish(segments ...*querypb.SegmentLoadIn // requestResource requests memory & storage to load segments, // returns the memory usage, disk usage and concurrency with the gained memory. -func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (uint64, uint64, int, error) { +func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) { + resource := LoadResource{} + + memoryUsage := hardware.GetUsedMemoryCount() + totalMemory := hardware.GetMemoryCount() + + diskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + return resource, 0, errors.Wrap(err, "get local used size failed") + } + diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + loader.mut.Lock() defer loader.mut.Unlock() + poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.ThreadCoreCoefficient.GetAsInt() + if loader.committedResource.WorkNum >= poolCap { + return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap)) + } else if loader.committedResource.MemorySize+memoryUsage >= totalMemory { + return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) + } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { + return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap)) + } + concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos)) - var memUsage, diskUsage uint64 for _, info := range infos { - logNum := 0 for _, field := range info.GetBinlogPaths() { - logNum += len(field.GetBinlogs()) + resource.WorkNum += len(field.GetBinlogs()) } - if logNum > 0 { - // IO pool will be run out even with the new smaller level - concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1)) + for _, index := range info.GetIndexInfos() { + resource.WorkNum += len(index.IndexFilePaths) } - - for ; concurrencyLevel > 1; concurrencyLevel /= 2 { - _, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel) - if err == nil { - break - } - } - - mu, du, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel) - if err != nil { - log.Warn("no sufficient resource to load segments", zap.Error(err)) - return 0, 0, 0, err - } - - memUsage += mu - diskUsage += du } - loader.committedMemSize += memUsage - loader.committedDiskSize += diskUsage + for ; concurrencyLevel > 1; concurrencyLevel /= 2 { + _, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel) + if err == nil { + break + } + } - return memUsage, diskUsage, concurrencyLevel, nil + mu, du, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel) + if err != nil { + log.Warn("no sufficient resource to load segments", zap.Error(err)) + return resource, 0, err + } + + resource.MemorySize += mu + resource.DiskSize += du + + loader.committedResource.Add(resource) + + return resource, concurrencyLevel, nil } // freeRequest returns request memory & storage usage request. -func (loader *segmentLoader) freeRequest(memUsage, diskUsage uint64) { +func (loader *segmentLoader) freeRequest(resource LoadResource) { loader.mut.Lock() defer loader.mut.Unlock() - loader.committedMemSize -= memUsage - loader.committedDiskSize -= diskUsage + loader.committedResource.Sub(resource) } func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error { @@ -383,7 +407,7 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI log.Info("start loading remote...", zap.Int("segmentNum", segmentNum)) - loadedBfs := NewConcurrentSet[*pkoracle.BloomFilterSet]() + loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]() // TODO check memory for bf size loadRemoteFunc := func(idx int) error { loadInfo := infos[idx] @@ -546,30 +570,6 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen return nil } -// Load binlogs concurrently into memory from KV storage asyncly -func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*conc.Future[*storage.Blob] { - futures := make([]*conc.Future[*storage.Blob], 0, len(field.Binlogs)) - for i := range field.Binlogs { - path := field.Binlogs[i].GetLogPath() - future := loader.ioPool.Submit(func() (*storage.Blob, error) { - binLog, err := loader.cm.Read(ctx, path) - if err != nil { - log.Warn("failed to load binlog", zap.String("filePath", path), zap.Error(err)) - return nil, err - } - blob := &storage.Blob{ - Key: path, - Value: binLog, - } - - return blob, nil - }) - - futures = append(futures, future) - } - return futures -} - func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, segment *LocalSegment, vecFieldInfos map[int64]*IndexedFieldInfo) error { for fieldID, fieldInfo := range vecFieldInfos { indexInfo := fieldInfo.IndexInfo @@ -610,58 +610,6 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS return segment.LoadIndex(indexInfo, fieldType) } -func (loader *segmentLoader) insertIntoSegment(segment *LocalSegment, - rowIDs []UniqueID, - timestamps []Timestamp, - insertData *storage.InsertData) error { - rowNum := len(rowIDs) - if rowNum != len(timestamps) || insertData == nil { - return errors.New(fmt.Sprintln("illegal insert data when load segment, collectionID = ", segment.collectionID)) - } - - log := log.With( - zap.Int64("collectionID", segment.Collection()), - zap.Int64("segmentID", segment.ID()), - ) - - log.Info("start load growing segments...", zap.Int("rowNum", len(rowIDs))) - - // 1. update bloom filter - insertRecord, err := storage.TransferInsertDataToInsertRecord(insertData) - if err != nil { - return err - } - insertMsg := &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionID: segment.collectionID, - Timestamps: timestamps, - RowIDs: rowIDs, - NumRows: uint64(rowNum), - FieldsData: insertRecord.FieldsData, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - } - collection := loader.manager.Collection.Get(segment.Collection()) - if collection == nil { - err := merr.WrapErrCollectionNotFound(segment.Collection()) - log.Warn("failed to get collection while inserting data into segment", zap.Error(err)) - return err - } - pks, err := GetPrimaryKeys(insertMsg, collection.Schema()) - if err != nil { - return err - } - segment.bloomFilterSet.UpdateBloomFilter(pks) - - // 2. do insert - err = segment.Insert(rowIDs, timestamps, insertRecord) - if err != nil { - return err - } - log.Info("Do insert done for growing segment", zap.Int("rowNum", rowNum)) - return nil -} - func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, binlogPaths []string, logType storage.StatsLogType) error { @@ -810,7 +758,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca } // JoinIDPath joins ids to path format. -func JoinIDPath(ids ...UniqueID) string { +func JoinIDPath(ids ...int64) string { idStr := make([]string, 0, len(ids)) for _, id := range ids { idStr = append(idStr, strconv.FormatInt(id, 10)) @@ -844,7 +792,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn zap.Int64("collectionID", segmentLoadInfos[0].GetCollectionID()), ) - memUsage := hardware.GetUsedMemoryCount() + loader.committedMemSize + memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize totalMem := hardware.GetMemoryCount() if memUsage == 0 || totalMem == 0 { return 0, 0, errors.New("get memory failed when checkSegmentSize") @@ -854,7 +802,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn if err != nil { return 0, 0, errors.Wrap(err, "get local used size failed") } - diskUsage := uint64(localDiskUsage) + loader.committedDiskSize + diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize maxSegmentSize := uint64(0) predictMemUsage := memUsage @@ -913,9 +861,9 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn log.Info("predict memory and disk usage while loading (in MiB)", zap.Uint64("maxSegmentSize", toMB(maxSegmentSize)), zap.Int("concurrency", concurrency), - zap.Uint64("committedMemSize", toMB(loader.committedMemSize)), + zap.Uint64("committedMemSize", toMB(loader.committedResource.MemorySize)), zap.Uint64("memUsage", toMB(memUsage)), - zap.Uint64("committedDiskSize", toMB(loader.committedDiskSize)), + zap.Uint64("committedDiskSize", toMB(loader.committedResource.DiskSize)), zap.Uint64("diskUsage", toMB(diskUsage)), zap.Uint64("predictMemUsage", toMB(predictMemUsage)), zap.Uint64("predictDiskUsage", toMB(predictDiskUsage)), @@ -985,11 +933,11 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen info.Statslogs = nil return info }) - memUsage, diskUsage, _, err := loader.requestResource(ctx, indexInfo...) + resource, _, err := loader.requestResource(ctx, indexInfo...) if err != nil { return err } - defer loader.freeRequest(memUsage, diskUsage) + defer loader.freeRequest(resource) log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos))) diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 9d601bd2fc..e18d15d231 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -54,6 +54,7 @@ var ( ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true) ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus ErrCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) + ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) // Collection related ErrCollectionNotFound = newMilvusError("collection not found", 100, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index ce4f5bbcc4..e77433b7ae 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -66,6 +66,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded) s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal) s.ErrorIs(WrapErrCrossClusterRouting("ins-0", "ins-1"), ErrCrossClusterRouting) + s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded) // Collection related s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 093830dacc..3b8ac7b5e9 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -176,6 +176,14 @@ func WrapErrCrossClusterRouting(expectedCluster, actualCluster string, msg ...st return err } +func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) error { + err := errors.Wrapf(ErrServiceDiskLimitExceeded, "predict=%v, limit=%v", predict, limit) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func WrapErrDatabaseNotFound(database any, msg ...string) error { err := wrapWithField(ErrDatabaseNotfound, "database", database) if len(msg) > 0 { diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index e2ff38fc2d..b610116d7a 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -110,15 +110,19 @@ func (pi *ParamItem) GetAsInt32() int32 { } func (pi *ParamItem) GetAsUint() uint { - return uint(getAsInt64(pi.GetValue())) + return uint(getAsUint64(pi.GetValue())) } func (pi *ParamItem) GetAsUint32() uint32 { - return uint32(getAsInt64(pi.GetValue())) + return uint32(getAsUint64(pi.GetValue())) +} + +func (pi *ParamItem) GetAsUint64() uint64 { + return getAsUint64(pi.GetValue()) } func (pi *ParamItem) GetAsUint16() uint16 { - return uint16(getAsInt64(pi.GetValue())) + return uint16(getAsUint64(pi.GetValue())) } func (pi *ParamItem) GetAsInt64() int64 { @@ -193,6 +197,12 @@ func getAsInt64(v string) int64 { }, 0) } +func getAsUint64(v string) uint64 { + return getAndConvert(v, func(value string) (uint64, error) { + return strconv.ParseUint(value, 10, 64) + }, 0) +} + func getAsFloat(v string) float64 { return getAndConvert(v, func(value string) (float64, error) { return strconv.ParseFloat(value, 64)