mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Use singleflight to limit readWithCache concurrent operation (#23037)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
af8e9558ba
commit
409e4dfe71
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/mmap"
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -49,6 +50,8 @@ type VectorChunkManager struct {
|
||||
cacheSize int64
|
||||
cacheSizeMutex sync.Mutex
|
||||
fixSize bool // Prevent cache capactiy from changing too frequently
|
||||
|
||||
singleflight singleflight.Group
|
||||
}
|
||||
|
||||
var _ ChunkManager = (*VectorChunkManager)(nil)
|
||||
@ -147,43 +150,51 @@ func (vcm *VectorChunkManager) Exist(ctx context.Context, filePath string) (bool
|
||||
}
|
||||
|
||||
func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath string) ([]byte, error) {
|
||||
contents, err := vcm.vectorStorage.Read(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results, err := vcm.deserializeVectorFile(filePath, contents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = vcm.cacheStorage.Write(ctx, filePath, results)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := vcm.cacheStorage.Mmap(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size, err := vcm.cacheStorage.Size(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vcm.cacheSizeMutex.Lock()
|
||||
vcm.cacheSize += size
|
||||
vcm.cacheSizeMutex.Unlock()
|
||||
if !vcm.fixSize {
|
||||
if vcm.cacheSize < vcm.cacheLimit {
|
||||
if vcm.cache.Len() == vcm.cache.Capacity() {
|
||||
newSize := float32(vcm.cache.Capacity()) * 1.25
|
||||
vcm.cache.Resize(int(newSize))
|
||||
}
|
||||
} else {
|
||||
// +1 is for add current value
|
||||
vcm.cache.Resize(vcm.cache.Len() + 1)
|
||||
vcm.fixSize = true
|
||||
val, err, _ := vcm.singleflight.Do(filePath, func() (any, error) {
|
||||
contents, err := vcm.vectorStorage.Read(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results, err := vcm.deserializeVectorFile(filePath, contents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = vcm.cacheStorage.Write(ctx, filePath, results)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := vcm.cacheStorage.Mmap(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size, err := vcm.cacheStorage.Size(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vcm.cacheSizeMutex.Lock()
|
||||
vcm.cacheSize += size
|
||||
vcm.cacheSizeMutex.Unlock()
|
||||
if !vcm.fixSize {
|
||||
if vcm.cacheSize < vcm.cacheLimit {
|
||||
if vcm.cache.Len() == vcm.cache.Capacity() {
|
||||
newSize := float32(vcm.cache.Capacity()) * 1.25
|
||||
vcm.cache.Resize(int(newSize))
|
||||
}
|
||||
} else {
|
||||
// +1 is for add current value
|
||||
vcm.cache.Resize(vcm.cache.Len() + 1)
|
||||
vcm.fixSize = true
|
||||
}
|
||||
}
|
||||
vcm.cache.Add(filePath, r)
|
||||
return results, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vcm.cache.Add(filePath, r)
|
||||
return results, nil
|
||||
|
||||
bs := val.([]byte)
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
// Read reads the pure vector data. If cached, it reads from local.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user