From 409e4dfe71e7605b8002ba0950d602699ca3bffb Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 27 Mar 2023 23:00:02 +0800 Subject: [PATCH] Use singleflight to limit readWithCache concurrent operation (#23037) Signed-off-by: Congqi Xia --- internal/storage/vector_chunk_manager.go | 81 ++++++++++++++---------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index d111210543..d8d5e24956 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "golang.org/x/exp/mmap" + "golang.org/x/sync/singleflight" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" @@ -49,6 +50,8 @@ type VectorChunkManager struct { cacheSize int64 cacheSizeMutex sync.Mutex fixSize bool // Prevent cache capactiy from changing too frequently + + singleflight singleflight.Group } var _ ChunkManager = (*VectorChunkManager)(nil) @@ -147,43 +150,51 @@ func (vcm *VectorChunkManager) Exist(ctx context.Context, filePath string) (bool } func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath string) ([]byte, error) { - contents, err := vcm.vectorStorage.Read(ctx, filePath) - if err != nil { - return nil, err - } - results, err := vcm.deserializeVectorFile(filePath, contents) - if err != nil { - return nil, err - } - err = vcm.cacheStorage.Write(ctx, filePath, results) - if err != nil { - return nil, err - } - r, err := vcm.cacheStorage.Mmap(ctx, filePath) - if err != nil { - return nil, err - } - size, err := vcm.cacheStorage.Size(ctx, filePath) - if err != nil { - return nil, err - } - vcm.cacheSizeMutex.Lock() - vcm.cacheSize += size - vcm.cacheSizeMutex.Unlock() - if !vcm.fixSize { - if vcm.cacheSize < vcm.cacheLimit { - if vcm.cache.Len() == vcm.cache.Capacity() { - newSize := float32(vcm.cache.Capacity()) * 1.25 - vcm.cache.Resize(int(newSize)) - } - } else { - // +1 is for add current value - vcm.cache.Resize(vcm.cache.Len() + 1) - vcm.fixSize = true + val, err, _ := vcm.singleflight.Do(filePath, func() (any, error) { + contents, err := vcm.vectorStorage.Read(ctx, filePath) + if err != nil { + return nil, err } + results, err := vcm.deserializeVectorFile(filePath, contents) + if err != nil { + return nil, err + } + err = vcm.cacheStorage.Write(ctx, filePath, results) + if err != nil { + return nil, err + } + r, err := vcm.cacheStorage.Mmap(ctx, filePath) + if err != nil { + return nil, err + } + size, err := vcm.cacheStorage.Size(ctx, filePath) + if err != nil { + return nil, err + } + vcm.cacheSizeMutex.Lock() + vcm.cacheSize += size + vcm.cacheSizeMutex.Unlock() + if !vcm.fixSize { + if vcm.cacheSize < vcm.cacheLimit { + if vcm.cache.Len() == vcm.cache.Capacity() { + newSize := float32(vcm.cache.Capacity()) * 1.25 + vcm.cache.Resize(int(newSize)) + } + } else { + // +1 is for add current value + vcm.cache.Resize(vcm.cache.Len() + 1) + vcm.fixSize = true + } + } + vcm.cache.Add(filePath, r) + return results, nil + }) + if err != nil { + return nil, err } - vcm.cache.Add(filePath, r) - return results, nil + + bs := val.([]byte) + return bs, nil } // Read reads the pure vector data. If cached, it reads from local.