Add cache for vector chunk (#15912)

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2022-03-26 22:05:26 +08:00 committed by GitHub
parent 25ac724d55
commit 478f6ca11e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 705 additions and 407 deletions

View File

@ -143,6 +143,9 @@ queryNode:
segcore: segcore:
chunkRows: 32768 # The number of vectors in a chunk. chunkRows: 32768 # The number of vectors in a chunk.
chunkManager:
localFileCacheLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
indexCoord: indexCoord:
address: localhost address: localhost

View File

@ -691,10 +691,13 @@ func genVectorChunkManager(ctx context.Context) (*storage.VectorChunkManager, er
} }
schema := genSimpleInsertDataSchema() schema := genSimpleInsertDataSchema()
vcm := storage.NewVectorChunkManager(lcm, rcm, &etcdpb.CollectionMeta{ vcm, err := storage.NewVectorChunkManager(lcm, rcm, &etcdpb.CollectionMeta{
ID: defaultCollectionID, ID: defaultCollectionID,
Schema: schema, Schema: schema,
}, false) }, Params.QueryNodeCfg.LocalFileCacheLimit, false)
if err != nil {
return nil, err
}
return vcm, nil return vcm, nil
} }

View File

@ -91,6 +91,7 @@ type queryCollection struct {
remoteChunkManager storage.ChunkManager remoteChunkManager storage.ChunkManager
vectorChunkManager *storage.VectorChunkManager vectorChunkManager *storage.VectorChunkManager
localCacheEnabled bool localCacheEnabled bool
localCacheSize int64
globalSegmentManager *globalSealedSegmentManager globalSegmentManager *globalSealedSegmentManager
} }
@ -142,6 +143,7 @@ func newQueryCollection(releaseCtx context.Context,
localChunkManager: localChunkManager, localChunkManager: localChunkManager,
remoteChunkManager: remoteChunkManager, remoteChunkManager: remoteChunkManager,
localCacheEnabled: localCacheEnabled, localCacheEnabled: localCacheEnabled,
localCacheSize: Params.QueryNodeCfg.LocalFileCacheLimit,
globalSegmentManager: newGlobalSealedSegmentManager(collectionID), globalSegmentManager: newGlobalSealedSegmentManager(collectionID),
} }
@ -172,10 +174,7 @@ func (q *queryCollection) close() {
// } // }
q.globalSegmentManager.close() q.globalSegmentManager.close()
if q.vectorChunkManager != nil { if q.vectorChunkManager != nil {
err := q.vectorChunkManager.Close() q.vectorChunkManager.Close()
if err != nil {
log.Warn("close vector chunk manager error occurs", zap.Error(err))
}
} }
} }
@ -1300,11 +1299,14 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
if q.remoteChunkManager == nil { if q.remoteChunkManager == nil {
return fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil, msgID = %d", retrieveMsg.ID()) return fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil, msgID = %d", retrieveMsg.ID())
} }
q.vectorChunkManager = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager, q.vectorChunkManager, err = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager,
&etcdpb.CollectionMeta{ &etcdpb.CollectionMeta{
ID: collection.id, ID: collection.id,
Schema: collection.schema, Schema: collection.schema,
}, q.localCacheEnabled) }, q.localCacheSize, q.localCacheEnabled)
if err != nil {
return err
}
} }
// historical retrieve // historical retrieve

View File

@ -132,7 +132,7 @@ func (lcm *LocalChunkManager) MultiRead(filePaths []string) ([][]byte, error) {
return results, el return results, el
} }
func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { func (lcm *LocalChunkManager) ListWithPrefix(prefix string) ([]string, error) {
var filePaths []string var filePaths []string
absPrefix := path.Join(lcm.localPath, prefix) absPrefix := path.Join(lcm.localPath, prefix)
dir := filepath.Dir(absPrefix) dir := filepath.Dir(absPrefix)
@ -142,6 +142,14 @@ func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte,
} }
return nil return nil
}) })
if err != nil {
return nil, err
}
return filePaths, nil
}
func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
filePaths, err := lcm.ListWithPrefix(prefix)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -167,7 +175,7 @@ func (lcm *LocalChunkManager) ReadAt(filePath string, off int64, length int64) (
return res, nil return res, nil
} }
func (lcm *LocalChunkManager) Mmap(filePath string) (io.ReaderAt, error) { func (lcm *LocalChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) {
absPath := path.Join(lcm.localPath, filePath) absPath := path.Join(lcm.localPath, filePath)
return mmap.Open(path.Clean(absPath)) return mmap.Open(path.Clean(absPath))
} }
@ -209,15 +217,7 @@ func (lcm *LocalChunkManager) MultiRemove(filePaths []string) error {
} }
func (lcm *LocalChunkManager) RemoveWithPrefix(prefix string) error { func (lcm *LocalChunkManager) RemoveWithPrefix(prefix string) error {
var filePaths []string filePaths, err := lcm.ListWithPrefix(prefix)
absPrefix := path.Join(lcm.localPath, prefix)
dir := filepath.Dir(absPrefix)
err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
if strings.HasPrefix(filePath, absPrefix) && !f.IsDir() {
filePaths = append(filePaths, strings.TrimPrefix(filePath, lcm.localPath))
}
return nil
})
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,153 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"container/list"
"errors"
)
type LRU struct {
evictList *list.List
items map[interface{}]*list.Element
size int
onEvicted func(k, v interface{})
}
type entry struct {
key interface{}
value interface{}
}
func NewLRU(size int, onEvicted func(k, v interface{})) (*LRU, error) {
if size <= 0 {
return nil, errors.New("cache size must be positive")
}
c := &LRU{
size: size,
evictList: list.New(),
items: make(map[interface{}]*list.Element),
onEvicted: onEvicted,
}
return c, nil
}
func (c *LRU) Add(key, value interface{}) {
if e, ok := c.items[key]; ok {
c.evictList.MoveToFront(e)
e.Value.(*entry).value = value
return
}
e := &entry{key: key, value: value}
listE := c.evictList.PushFront(e)
c.items[key] = listE
if c.evictList.Len() > c.size {
c.RemoveOldest()
}
}
func (c *LRU) Get(key interface{}) (value interface{}, ok bool) {
if e, ok := c.items[key]; ok {
c.evictList.MoveToFront(e)
kv := e.Value.(*entry)
return kv.value, true
}
return nil, false
}
func (c *LRU) Remove(key interface{}) {
if e, ok := c.items[key]; ok {
c.evictList.Remove(e)
kv := e.Value.(*entry)
delete(c.items, kv.key)
if c.onEvicted != nil {
c.onEvicted(kv.key, kv.value)
}
}
}
func (c *LRU) Contains(key interface{}) bool {
_, ok := c.items[key]
return ok
}
// Peek get value but not update the recently-used list.
func (c *LRU) Peek(key interface{}) (value interface{}, ok bool) {
if e, ok := c.items[key]; ok {
kv := e.Value.(*entry)
return kv.value, true
}
return nil, false
}
func (c *LRU) Keys() []interface{} {
keys := make([]interface{}, len(c.items))
i := 0
for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() {
keys[i] = ent.Value.(*entry).key
i++
}
return keys
}
func (c *LRU) Len() int {
return c.evictList.Len()
}
func (c *LRU) Purge() {
for k, v := range c.items {
if c.onEvicted != nil {
c.onEvicted(k, v.Value.(*entry).value)
}
delete(c.items, k)
}
c.evictList.Init()
}
func (c *LRU) Resize(size int) int {
c.size = size
if size >= c.evictList.Len() {
return 0
}
diff := c.evictList.Len() - c.size
for i := 0; i < diff; i++ {
c.RemoveOldest()
}
return diff
}
func (c *LRU) GetOldest() (interface{}, interface{}, bool) {
ent := c.evictList.Back()
if ent != nil {
kv := ent.Value.(*entry)
return kv.key, kv.value, true
}
return nil, nil, false
}
func (c *LRU) RemoveOldest() {
e := c.evictList.Back()
if e != nil {
c.evictList.Remove(e)
kv := e.Value.(*entry)
delete(c.items, kv.key)
if c.onEvicted != nil {
c.onEvicted(kv.key, kv.value)
}
}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/mmap"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/errorutil" "github.com/milvus-io/milvus/internal/util/errorutil"
@ -168,13 +169,9 @@ func (mcm *MinioChunkManager) MultiRead(keys []string) ([][]byte, error) {
} }
func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix}) objectsKeys, err := mcm.ListWithPrefix(prefix)
if err != nil {
var objectsKeys []string return nil, nil, err
var objectsValues [][]byte
for object := range objects {
objectsKeys = append(objectsKeys, object.Key)
} }
objectsValues, err := mcm.MultiRead(objectsKeys) objectsValues, err := mcm.MultiRead(objectsKeys)
if err != nil { if err != nil {
@ -185,8 +182,8 @@ func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte,
return objectsKeys, objectsValues, nil return objectsKeys, objectsValues, nil
} }
func (mcm *MinioChunkManager) Mmap(filePath string) (io.ReaderAt, error) { func (mcm *MinioChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) {
panic("this method has not been implemented") return nil, errors.New("this method has not been implemented")
} }
// ReadAt reads specific position data of minio storage if exists. // ReadAt reads specific position data of minio storage if exists.
@ -249,3 +246,14 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error {
} }
return nil return nil
} }
func (mcm *MinioChunkManager) ListWithPrefix(prefix string) ([]string, error) {
objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix})
var objectsKeys []string
for object := range objects {
objectsKeys = append(objectsKeys, object.Key)
}
return objectsKeys, nil
}

View File

@ -381,25 +381,48 @@ func TestMinIOCM(t *testing.T) {
}) })
t.Run("test GetPath", func(t *testing.T) { t.Run("test GetPath", func(t *testing.T) {
testGetSizeRoot := "get_path" testGetPathRoot := path.Join(testMinIOKVRoot, "get_path")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM := NewLocalChunkManager(RootPath(localPath)) testCM, err := newMinIOChunkManager(ctx, testBucket)
defer testCM.RemoveWithPrefix(testGetSizeRoot) require.NoError(t, err)
defer testCM.RemoveWithPrefix(testGetPathRoot)
key := path.Join(testGetSizeRoot, "TestMinIOKV_GetPath_key") key := path.Join(testGetPathRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetPath_value") value := []byte("TestMinIOKV_GetSize_value")
err := testCM.Write(key, value) err = testCM.Write(key, value)
assert.NoError(t, err) assert.NoError(t, err)
p, err := testCM.GetPath(key) p, err := testCM.GetPath(key)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, p, path.Join(localPath, key)) assert.Equal(t, p, key)
key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2") key2 := path.Join(testGetPathRoot, "TestMemoryKV_GetSize_key2")
p, err = testCM.GetPath(key2) p, err = testCM.GetPath(key2)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, p, "") assert.Equal(t, p, "")
}) })
t.Run("test Mmap", func(t *testing.T) {
testMmapRoot := path.Join(testMinIOKVRoot, "mmap")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(testMmapRoot)
key := path.Join(testMmapRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetSize_value")
err = testCM.Write(key, value)
assert.NoError(t, err)
r, err := testCM.Mmap(key)
assert.Error(t, err)
assert.Nil(t, r)
})
} }

View File

@ -11,6 +11,10 @@
package storage package storage
import (
"golang.org/x/exp/mmap"
)
// ChunkManager is to manager chunks. // ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks. // Include Read, Write, Remove chunks.
type ChunkManager interface { type ChunkManager interface {
@ -28,8 +32,10 @@ type ChunkManager interface {
Read(filePath string) ([]byte, error) Read(filePath string) ([]byte, error)
// MultiRead reads @filePath and returns content. // MultiRead reads @filePath and returns content.
MultiRead(filePaths []string) ([][]byte, error) MultiRead(filePaths []string) ([][]byte, error)
ListWithPrefix(prefix string) ([]string, error)
// ReadWithPrefix reads files with same @prefix and returns contents. // ReadWithPrefix reads files with same @prefix and returns contents.
ReadWithPrefix(prefix string) ([]string, [][]byte, error) ReadWithPrefix(prefix string) ([]string, [][]byte, error)
Mmap(filePath string) (*mmap.ReaderAt, error)
// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read. // ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
// if all bytes are read, @err is io.EOF. // if all bytes are read, @err is io.EOF.
// return other error if read failed. // return other error if read failed.

View File

@ -19,52 +19,86 @@ package storage
import ( import (
"errors" "errors"
"io" "io"
"sync"
"github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/cache"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
)
var (
defaultLocalCacheSize = 64
) )
// VectorChunkManager is responsible for read and write vector data. // VectorChunkManager is responsible for read and write vector data.
type VectorChunkManager struct { type VectorChunkManager struct {
localChunkManager ChunkManager cacheStorage ChunkManager
remoteChunkManager ChunkManager vectorStorage ChunkManager
cache *cache.LRU
schema *etcdpb.CollectionMeta insertCodec *InsertCodec
localCacheEnable bool cacheEnable bool
cacheLimit int64
cacheSize int64
cacheSizeMutex sync.Mutex
fixSize bool // Prevent cache capactiy from changing too frequently
} }
var _ ChunkManager = (*VectorChunkManager)(nil) var _ ChunkManager = (*VectorChunkManager)(nil)
// NewVectorChunkManager create a new vector manager object. // NewVectorChunkManager create a new vector manager object.
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta, localCacheEnable bool) *VectorChunkManager { func NewVectorChunkManager(cacheStorage ChunkManager, vectorStorage ChunkManager, schema *etcdpb.CollectionMeta, cacheLimit int64, cacheEnable bool) (*VectorChunkManager, error) {
return &VectorChunkManager{ insertCodec := NewInsertCodec(schema)
localChunkManager: localChunkManager, vcm := &VectorChunkManager{
remoteChunkManager: remoteChunkManager, cacheStorage: cacheStorage,
vectorStorage: vectorStorage,
schema: schema, insertCodec: insertCodec,
localCacheEnable: localCacheEnable, cacheEnable: cacheEnable,
cacheLimit: cacheLimit,
} }
if cacheEnable {
if cacheLimit <= 0 {
return nil, errors.New("cache limit must be positive if cacheEnable")
}
c, err := cache.NewLRU(defaultLocalCacheSize, func(k cache.Key, v cache.Value) {
r := v.(*mmap.ReaderAt)
size := r.Len()
err := r.Close()
if err != nil {
log.Error("Unmmap file failed", zap.Any("file", k))
}
err = cacheStorage.Remove(k.(string))
if err != nil {
log.Error("cache storage remove file failed", zap.Any("file", k))
}
vcm.cacheSizeMutex.Lock()
vcm.cacheSize -= int64(size)
vcm.cacheSizeMutex.Unlock()
})
if err != nil {
return nil, err
}
vcm.cache = c
}
return vcm, nil
} }
// For vector data, we will download vector file from storage. And we will // For vector data, we will download vector file from storage. And we will
// deserialize the file for it has binlog style. At last we store pure vector // deserialize the file for it has binlog style. At last we store pure vector
// data to local storage as cache. // data to local storage as cache.
func (vcm *VectorChunkManager) downloadFile(filePath string) ([]byte, error) { func (vcm *VectorChunkManager) deserializeVectorFile(filePath string, content []byte) ([]byte, error) {
if vcm.localChunkManager.Exist(filePath) {
return vcm.localChunkManager.Read(filePath)
}
content, err := vcm.remoteChunkManager.Read(filePath)
if err != nil {
return nil, err
}
insertCodec := NewInsertCodec(vcm.schema)
blob := &Blob{ blob := &Blob{
Key: filePath, Key: filePath,
Value: content, Value: content,
} }
_, _, data, err := insertCodec.Deserialize([]*Blob{blob}) _, _, data, err := vcm.insertCodec.Deserialize([]*Blob{blob})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -83,94 +117,151 @@ func (vcm *VectorChunkManager) downloadFile(filePath string) ([]byte, error) {
// GetPath returns the path of vector data. If cached, return local path. // GetPath returns the path of vector data. If cached, return local path.
// If not cached return remote path. // If not cached return remote path.
func (vcm *VectorChunkManager) GetPath(filePath string) (string, error) { func (vcm *VectorChunkManager) GetPath(filePath string) (string, error) {
if vcm.localChunkManager.Exist(filePath) && vcm.localCacheEnable { return vcm.vectorStorage.GetPath(filePath)
return vcm.localChunkManager.GetPath(filePath)
}
return vcm.remoteChunkManager.GetPath(filePath)
} }
func (vcm *VectorChunkManager) GetSize(filePath string) (int64, error) { func (vcm *VectorChunkManager) GetSize(filePath string) (int64, error) {
if vcm.localChunkManager.Exist(filePath) && vcm.localCacheEnable { return vcm.vectorStorage.GetSize(filePath)
return vcm.localChunkManager.GetSize(filePath)
}
return vcm.remoteChunkManager.GetSize(filePath)
} }
// Write writes the vector data to local cache if cache enabled. // Write writes the vector data to local cache if cache enabled.
func (vcm *VectorChunkManager) Write(filePath string, content []byte) error { func (vcm *VectorChunkManager) Write(filePath string, content []byte) error {
if !vcm.localCacheEnable { return vcm.vectorStorage.Write(filePath, content)
return errors.New("cannot write local file for local cache is not allowed")
}
return vcm.localChunkManager.Write(filePath, content)
} }
// MultiWrite writes the vector data to local cache if cache enabled. // MultiWrite writes the vector data to local cache if cache enabled.
func (vcm *VectorChunkManager) MultiWrite(contents map[string][]byte) error { func (vcm *VectorChunkManager) MultiWrite(contents map[string][]byte) error {
if !vcm.localCacheEnable { return vcm.vectorStorage.MultiWrite(contents)
return errors.New("cannot write local file for local cache is not allowed")
}
return vcm.localChunkManager.MultiWrite(contents)
} }
// Exist checks whether vector data is saved to local cache. // Exist checks whether vector data is saved to local cache.
func (vcm *VectorChunkManager) Exist(filePath string) bool { func (vcm *VectorChunkManager) Exist(filePath string) bool {
return vcm.localChunkManager.Exist(filePath) return vcm.vectorStorage.Exist(filePath)
}
func (vcm *VectorChunkManager) readWithCache(filePath string) ([]byte, error) {
contents, err := vcm.vectorStorage.Read(filePath)
if err != nil {
return nil, err
}
results, err := vcm.deserializeVectorFile(filePath, contents)
if err != nil {
return nil, err
}
err = vcm.cacheStorage.Write(filePath, results)
if err != nil {
return nil, err
}
r, err := vcm.cacheStorage.Mmap(filePath)
if err != nil {
return nil, err
}
size, err := vcm.cacheStorage.GetSize(filePath)
if err != nil {
return nil, err
}
vcm.cacheSizeMutex.Lock()
vcm.cacheSize += size
vcm.cacheSizeMutex.Unlock()
if !vcm.fixSize {
if vcm.cacheSize < vcm.cacheLimit {
if vcm.cache.Len() == vcm.cache.Capacity() {
newSize := float32(vcm.cache.Capacity()) * 1.25
vcm.cache.Resize(int(newSize))
}
} else {
// +1 is for add current value
vcm.cache.Resize(vcm.cache.Len() + 1)
vcm.fixSize = true
}
}
vcm.cache.Add(filePath, r)
return results, nil
} }
// Read reads the pure vector data. If cached, it reads from local. // Read reads the pure vector data. If cached, it reads from local.
func (vcm *VectorChunkManager) Read(filePath string) ([]byte, error) { func (vcm *VectorChunkManager) Read(filePath string) ([]byte, error) {
if vcm.localCacheEnable { if vcm.cacheEnable {
if vcm.localChunkManager.Exist(filePath) { if r, ok := vcm.cache.Get(filePath); ok {
return vcm.localChunkManager.Read(filePath) at := r.(*mmap.ReaderAt)
p := make([]byte, at.Len())
_, err := at.ReadAt(p, 0)
if err != nil {
return p, err
} }
contents, err := vcm.downloadFile(filePath) return p, nil
}
return vcm.readWithCache(filePath)
}
contents, err := vcm.vectorStorage.Read(filePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = vcm.localChunkManager.Write(filePath, contents) return vcm.deserializeVectorFile(filePath, contents)
if err != nil {
return nil, err
}
return vcm.localChunkManager.Read(filePath)
}
return vcm.downloadFile(filePath)
} }
// MultiRead reads the pure vector data. If cached, it reads from local. // MultiRead reads the pure vector data. If cached, it reads from local.
func (vcm *VectorChunkManager) MultiRead(filePaths []string) ([][]byte, error) { func (vcm *VectorChunkManager) MultiRead(filePaths []string) ([][]byte, error) {
var results [][]byte results := make([][]byte, len(filePaths))
for _, filePath := range filePaths { for i, filePath := range filePaths {
content, err := vcm.Read(filePath) content, err := vcm.Read(filePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
results = append(results, content) results[i] = content
} }
return results, nil return results, nil
} }
func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
panic("has not implemented yet") filePaths, err := vcm.ListWithPrefix(prefix)
if err != nil {
return nil, nil, err
}
results, err := vcm.MultiRead(filePaths)
if err != nil {
return nil, nil, err
}
return filePaths, results, nil
}
func (vcm *VectorChunkManager) ListWithPrefix(prefix string) ([]string, error) {
return vcm.vectorStorage.ListWithPrefix(prefix)
}
func (vcm *VectorChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) {
if vcm.cacheEnable && vcm.cache != nil {
if r, ok := vcm.cache.Get(filePath); ok {
return r.(*mmap.ReaderAt), nil
}
}
return nil, errors.New("the file mmap has not been cached")
} }
// ReadAt reads specific position data of vector. If cached, it reads from local. // ReadAt reads specific position data of vector. If cached, it reads from local.
func (vcm *VectorChunkManager) ReadAt(filePath string, off int64, length int64) ([]byte, error) { func (vcm *VectorChunkManager) ReadAt(filePath string, off int64, length int64) ([]byte, error) {
if vcm.localCacheEnable { if vcm.cacheEnable {
if vcm.localChunkManager.Exist(filePath) { if r, ok := vcm.cache.Get(filePath); ok {
return vcm.localChunkManager.ReadAt(filePath, off, length) at := r.(*mmap.ReaderAt)
} p := make([]byte, length)
results, err := vcm.downloadFile(filePath) _, err := at.ReadAt(p, off)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = vcm.localChunkManager.Write(filePath, results) return p, nil
}
results, err := vcm.readWithCache(filePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return vcm.localChunkManager.ReadAt(filePath, off, length) return results[off : off+length], nil
} }
results, err := vcm.downloadFile(filePath) contents, err := vcm.vectorStorage.Read(filePath)
if err != nil {
return nil, err
}
results, err := vcm.deserializeVectorFile(filePath, contents)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -184,46 +275,51 @@ func (vcm *VectorChunkManager) ReadAt(filePath string, off int64, length int64)
if n < len(p) { if n < len(p) {
return nil, io.EOF return nil, io.EOF
} }
return p, nil return p, nil
} }
func (vcm *VectorChunkManager) Remove(filePath string) error { func (vcm *VectorChunkManager) Remove(filePath string) error {
err := vcm.localChunkManager.Remove(filePath) err := vcm.vectorStorage.Remove(filePath)
if err != nil { if err != nil {
return err return err
} }
err = vcm.remoteChunkManager.Remove(filePath) if vcm.cacheEnable {
if err != nil { vcm.cache.Remove(filePath)
return err
} }
return nil return nil
} }
func (vcm *VectorChunkManager) MultiRemove(filePaths []string) error { func (vcm *VectorChunkManager) MultiRemove(filePaths []string) error {
err := vcm.localChunkManager.MultiRemove(filePaths) err := vcm.vectorStorage.MultiRemove(filePaths)
if err != nil { if err != nil {
return err return err
} }
err = vcm.remoteChunkManager.MultiRemove(filePaths) if vcm.cacheEnable {
if err != nil { for _, p := range filePaths {
return err vcm.cache.Remove(p)
}
} }
return nil return nil
} }
func (vcm *VectorChunkManager) RemoveWithPrefix(prefix string) error { func (vcm *VectorChunkManager) RemoveWithPrefix(prefix string) error {
err := vcm.localChunkManager.RemoveWithPrefix(prefix) err := vcm.vectorStorage.RemoveWithPrefix(prefix)
if err != nil { if err != nil {
return err return err
} }
err = vcm.remoteChunkManager.RemoveWithPrefix(prefix) if vcm.cacheEnable {
filePaths, err := vcm.ListWithPrefix(prefix)
if err != nil { if err != nil {
return err return err
} }
for _, p := range filePaths {
vcm.cache.Remove(p)
}
}
return nil return nil
} }
func (vcm *VectorChunkManager) Close() error { func (vcm *VectorChunkManager) Close() {
// TODOReplace the cache with the local chunk manager and clear the cache when closed if vcm.cache != nil && vcm.cacheEnable {
return vcm.localChunkManager.RemoveWithPrefix("") vcm.cache.Close()
}
} }

View File

@ -20,15 +20,13 @@ import (
"context" "context"
"errors" "errors"
"os" "os"
"path"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
) )
func initMeta() *etcdpb.CollectionMeta { func initMeta() *etcdpb.CollectionMeta {
@ -131,7 +129,10 @@ func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorCh
lcm := NewLocalChunkManager(RootPath(localPath)) lcm := NewLocalChunkManager(RootPath(localPath))
meta := initMeta() meta := initMeta()
vcm := NewVectorChunkManager(lcm, rcm, meta, localCacheEnable) vcm, err := NewVectorChunkManager(lcm, rcm, meta, 16, localCacheEnable)
if err != nil {
return nil, cancel, err
}
return vcm, cancel, nil return vcm, cancel, nil
} }
@ -149,21 +150,40 @@ func TestMain(m *testing.M) {
os.Exit(exitCode) os.Exit(exitCode)
} }
func TestVectorChunkManager_GetPath(t *testing.T) { func TestNewVectorChunkManager(t *testing.T) {
vcm, cancel, err := buildVectorChunkManager(localPath, true) ctx := context.Background()
bucketName := "vector-chunk-manager"
rcm, err := newMinIOChunkManager(ctx, bucketName)
assert.Nil(t, err)
assert.NotNil(t, rcm)
lcm := NewLocalChunkManager(RootPath(localPath))
meta := initMeta()
vcm, err := NewVectorChunkManager(lcm, rcm, meta, 16, true)
assert.Nil(t, err)
assert.NotNil(t, vcm) assert.NotNil(t, vcm)
vcm, err = NewVectorChunkManager(lcm, rcm, meta, -1, true)
assert.NotNil(t, err)
assert.Nil(t, vcm)
}
func TestVectorChunkManager_GetPath(t *testing.T) {
localCaches := []bool{true, false}
for _, localCache := range localCaches {
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1" key := "1"
err = vcm.Write(key, []byte{1}) err = vcm.Write(key, []byte{1})
assert.Nil(t, err) assert.Nil(t, err)
pathGet, err := vcm.GetPath(key) pathGet, err := vcm.GetPath(key)
assert.Nil(t, err) assert.Nil(t, err)
pathJoin := path.Join(localPath, key) assert.Equal(t, pathGet, key)
assert.Equal(t, pathGet, pathJoin)
vcm.localCacheEnable = false err = vcm.cacheStorage.Write(key, []byte{1})
err = vcm.remoteChunkManager.Write(key, []byte{1})
assert.Nil(t, err) assert.Nil(t, err)
pathGet, err = vcm.GetPath(key) pathGet, err = vcm.GetPath(key)
assert.Nil(t, err) assert.Nil(t, err)
@ -172,12 +192,16 @@ func TestVectorChunkManager_GetPath(t *testing.T) {
err = vcm.RemoveWithPrefix(localPath) err = vcm.RemoveWithPrefix(localPath)
assert.NoError(t, err) assert.NoError(t, err)
cancel() cancel()
vcm.Close()
}
} }
func TestVectorChunkManager_GetSize(t *testing.T) { func TestVectorChunkManager_GetSize(t *testing.T) {
vcm, cancel, err := buildVectorChunkManager(localPath, true) localCaches := []bool{true, false}
assert.NotNil(t, vcm) for _, localCache := range localCaches {
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1" key := "1"
err = vcm.Write(key, []byte{1}) err = vcm.Write(key, []byte{1})
@ -186,8 +210,7 @@ func TestVectorChunkManager_GetSize(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, sizeGet, 1) assert.EqualValues(t, sizeGet, 1)
vcm.localCacheEnable = false err = vcm.cacheStorage.Write(key, []byte{1})
err = vcm.remoteChunkManager.Write(key, []byte{1})
assert.Nil(t, err) assert.Nil(t, err)
sizeGet, err = vcm.GetSize(key) sizeGet, err = vcm.GetSize(key)
assert.Nil(t, err) assert.Nil(t, err)
@ -196,19 +219,19 @@ func TestVectorChunkManager_GetSize(t *testing.T) {
err = vcm.RemoveWithPrefix(localPath) err = vcm.RemoveWithPrefix(localPath)
assert.NoError(t, err) assert.NoError(t, err)
cancel() cancel()
vcm.Close()
}
} }
func TestVectorChunkManager_Write(t *testing.T) { func TestVectorChunkManager_Write(t *testing.T) {
vcm, cancel, err := buildVectorChunkManager(localPath, false) localCaches := []bool{true, false}
for _, localCache := range localCaches {
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, vcm) assert.NotNil(t, vcm)
key := "1" key := "1"
err = vcm.Write(key, []byte{1}) err = vcm.Write(key, []byte{1})
assert.Error(t, err)
vcm.localCacheEnable = true
err = vcm.Write(key, []byte{1})
assert.Nil(t, err) assert.Nil(t, err)
exist := vcm.Exist(key) exist := vcm.Exist(key)
@ -229,6 +252,8 @@ func TestVectorChunkManager_Write(t *testing.T) {
err = vcm.RemoveWithPrefix(localPath) err = vcm.RemoveWithPrefix(localPath)
assert.NoError(t, err) assert.NoError(t, err)
cancel() cancel()
vcm.Close()
}
} }
func TestVectorChunkManager_Remove(t *testing.T) { func TestVectorChunkManager_Remove(t *testing.T) {
@ -239,7 +264,7 @@ func TestVectorChunkManager_Remove(t *testing.T) {
assert.NotNil(t, vcm) assert.NotNil(t, vcm)
key := "1" key := "1"
err = vcm.remoteChunkManager.Write(key, []byte{1}) err = vcm.cacheStorage.Write(key, []byte{1})
assert.Nil(t, err) assert.Nil(t, err)
err = vcm.Remove(key) err = vcm.Remove(key)
@ -252,7 +277,7 @@ func TestVectorChunkManager_Remove(t *testing.T) {
"key_1": {111}, "key_1": {111},
"key_2": {222}, "key_2": {222},
} }
err = vcm.remoteChunkManager.MultiWrite(contents) err = vcm.cacheStorage.MultiWrite(contents)
assert.NoError(t, err) assert.NoError(t, err)
err = vcm.MultiRemove([]string{"key_1", "key_2"}) err = vcm.MultiRemove([]string{"key_1", "key_2"})
@ -266,6 +291,7 @@ func TestVectorChunkManager_Remove(t *testing.T) {
err = vcm.RemoveWithPrefix(localPath) err = vcm.RemoveWithPrefix(localPath)
assert.NoError(t, err) assert.NoError(t, err)
cancel() cancel()
vcm.Close()
} }
} }
@ -296,15 +322,8 @@ func (m *mockFailedChunkManager) MultiRemove(key []string) error {
func TestVectorChunkManager_Remove_Fail(t *testing.T) { func TestVectorChunkManager_Remove_Fail(t *testing.T) {
vcm := &VectorChunkManager{ vcm := &VectorChunkManager{
localChunkManager: &mockFailedChunkManager{fail: true}, vectorStorage: &mockFailedChunkManager{fail: true},
} cacheStorage: &mockFailedChunkManager{fail: true},
assert.Error(t, vcm.Remove("test"))
assert.Error(t, vcm.MultiRemove([]string{"test"}))
assert.Error(t, vcm.RemoveWithPrefix("test"))
vcm = &VectorChunkManager{
localChunkManager: &mockFailedChunkManager{fail: false},
remoteChunkManager: &mockFailedChunkManager{fail: true},
} }
assert.Error(t, vcm.Remove("test")) assert.Error(t, vcm.Remove("test"))
assert.Error(t, vcm.MultiRemove([]string{"test"})) assert.Error(t, vcm.MultiRemove([]string{"test"}))
@ -322,17 +341,11 @@ func TestVectorChunkManager_Read(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Nil(t, content) assert.Nil(t, content)
vcm.localCacheEnable = true
content, err = vcm.Read("9999")
assert.Error(t, err)
assert.Nil(t, content)
meta := initMeta() meta := initMeta()
binlogs := initBinlogFile(meta) binlogs := initBinlogFile(meta)
assert.NotNil(t, binlogs) assert.NotNil(t, binlogs)
for _, binlog := range binlogs { for _, binlog := range binlogs {
err := vcm.remoteChunkManager.Write(binlog.Key, binlog.Value) err := vcm.vectorStorage.Write(binlog.Key, binlog.Value)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -360,11 +373,21 @@ func TestVectorChunkManager_Read(t *testing.T) {
} }
assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult) assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
keys, contents, err := vcm.ReadWithPrefix("10")
assert.Nil(t, err)
assert.Equal(t, "101", keys[0])
assert.Equal(t, []byte{3, 4}, contents[0])
assert.Nil(t, err)
assert.Equal(t, "108", keys[1])
assert.Equal(t, []byte{0, 255}, contents[1])
floatResult = make([]float32, 0) floatResult = make([]float32, 0)
for i := 0; i < len(content)/4; i++ { for i := 0; i < len(content)/4; i++ {
singleData := typeutil.BytesToFloat32(contents[1][i*4 : i*4+4]) singleData := typeutil.BytesToFloat32(contents[2][i*4 : i*4+4])
floatResult = append(floatResult, singleData) floatResult = append(floatResult, singleData)
} }
assert.Equal(t, "109", keys[2])
assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult) assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
content, err = vcm.ReadAt("109", 8*4, 8*4) content, err = vcm.ReadAt("109", 8*4, 8*4)
@ -381,11 +404,23 @@ func TestVectorChunkManager_Read(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Nil(t, content) assert.Nil(t, content)
vcm.localCacheEnable = false
content, err = vcm.ReadAt("109", 8*4, 8*4) content, err = vcm.ReadAt("109", 8*4, 8*4)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 32, len(content)) assert.Equal(t, 32, len(content))
if localCache {
r, err := vcm.Mmap("109")
assert.Nil(t, err)
p := make([]byte, 32)
n, err := r.ReadAt(p, 32)
assert.Nil(t, err)
assert.Equal(t, n, 32)
r, err = vcm.Mmap("not exist")
assert.Error(t, err)
assert.Nil(t, nil)
}
content, err = vcm.ReadAt("109", 9999, 8*4) content, err = vcm.ReadAt("109", 9999, 8*4)
assert.Error(t, err) assert.Error(t, err)
assert.Nil(t, content) assert.Nil(t, content)
@ -396,6 +431,8 @@ func TestVectorChunkManager_Read(t *testing.T) {
err = vcm.RemoveWithPrefix(localPath) err = vcm.RemoveWithPrefix(localPath)
assert.NoError(t, err) assert.NoError(t, err)
cancel() cancel()
vcm.Close()
} }
} }

243
internal/util/cache/lru_cache.go vendored Normal file
View File

@ -0,0 +1,243 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"container/list"
"context"
"errors"
"fmt"
"sync"
)
type LRU struct {
ctx context.Context
cancel context.CancelFunc
evictList *list.List
items map[interface{}]*list.Element
capacity int
onEvicted func(k Key, v Value)
m sync.RWMutex
evictedCh chan *entry
stats *Stats
}
type Stats struct {
hitCount float32
evictedCount float32
readCount float32
writeCount float32
}
func (s *Stats) String() string {
var hitRatio float32
var evictedRatio float32
if s.readCount != 0 {
hitRatio = s.hitCount / s.readCount
evictedRatio = s.evictedCount / s.writeCount
}
return fmt.Sprintf("lru cache hit ratio = %f, evictedRatio = %f", hitRatio, evictedRatio)
}
type Key interface {
}
type Value interface {
}
type entry struct {
key Key
value Value
}
func NewLRU(capacity int, onEvicted func(k Key, v Value)) (*LRU, error) {
if capacity <= 0 {
return nil, errors.New("cache size must be positive")
}
ctx, cancel := context.WithCancel(context.Background())
c := &LRU{
ctx: ctx,
cancel: cancel,
capacity: capacity,
evictList: list.New(),
items: make(map[interface{}]*list.Element),
onEvicted: onEvicted,
evictedCh: make(chan *entry, 16),
stats: &Stats{},
}
go c.evictedWorker()
return c, nil
}
func (c *LRU) evictedWorker() {
for {
select {
case <-c.ctx.Done():
return
case e, ok := <-c.evictedCh:
if ok {
if c.onEvicted != nil {
c.onEvicted(e.key, e.value)
}
}
}
}
}
func (c *LRU) Add(key, value Value) {
c.m.Lock()
defer c.m.Unlock()
c.stats.writeCount++
if e, ok := c.items[key]; ok {
c.evictList.MoveToFront(e)
e.Value.(*entry).value = value
return
}
e := &entry{key: key, value: value}
listE := c.evictList.PushFront(e)
c.items[key] = listE
if c.evictList.Len() > c.capacity {
c.stats.evictedCount++
oldestE := c.evictList.Back()
if oldestE != nil {
c.evictList.Remove(oldestE)
kv := oldestE.Value.(*entry)
delete(c.items, kv.key)
if c.onEvicted != nil {
c.evictedCh <- kv
}
}
}
}
func (c *LRU) Get(key Key) (value Value, ok bool) {
c.m.RLock()
defer c.m.RUnlock()
c.stats.readCount++
if e, ok := c.items[key]; ok {
c.stats.hitCount++
c.evictList.MoveToFront(e)
kv := e.Value.(*entry)
return kv.value, true
}
return nil, false
}
func (c *LRU) Remove(key Key) {
c.m.Lock()
defer c.m.Unlock()
if e, ok := c.items[key]; ok {
c.evictList.Remove(e)
kv := e.Value.(*entry)
delete(c.items, kv.key)
if c.onEvicted != nil {
c.evictedCh <- kv
}
}
}
func (c *LRU) Contains(key Key) bool {
c.m.RLock()
defer c.m.RUnlock()
_, ok := c.items[key]
return ok
}
func (c *LRU) Keys() []Key {
c.m.RLock()
defer c.m.RUnlock()
keys := make([]Key, len(c.items))
i := 0
for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() {
keys[i] = ent.Value.(*entry).key
i++
}
return keys
}
func (c *LRU) Len() int {
c.m.RLock()
defer c.m.RUnlock()
return c.evictList.Len()
}
func (c *LRU) Capacity() int {
return c.capacity
}
func (c *LRU) Purge() {
c.m.Lock()
defer c.m.Unlock()
for k, v := range c.items {
if c.onEvicted != nil {
c.evictedCh <- v.Value.(*entry)
}
delete(c.items, k)
}
c.evictList.Init()
}
func (c *LRU) Resize(capacity int) int {
c.m.Lock()
defer c.m.Unlock()
c.capacity = capacity
if capacity >= c.evictList.Len() {
return 0
}
diff := c.evictList.Len() - c.capacity
for i := 0; i < diff; i++ {
oldestE := c.evictList.Back()
if oldestE != nil {
c.evictList.Remove(oldestE)
kv := oldestE.Value.(*entry)
delete(c.items, kv.key)
if c.onEvicted != nil {
c.evictedCh <- kv
}
}
}
return diff
}
func (c *LRU) GetOldest() (Key, Value, bool) {
c.m.RLock()
defer c.m.RUnlock()
ent := c.evictList.Back()
if ent != nil {
kv := ent.Value.(*entry)
return kv.key, kv.value, true
}
return nil, nil, false
}
func (c *LRU) Close() {
c.Purge()
c.cancel()
remain := len(c.evictedCh)
for i := 0; i < remain; i++ {
e, ok := <-c.evictedCh
if ok {
c.onEvicted(e.key, e.value)
}
}
close(c.evictedCh)
}
func (c *LRU) Stats() *Stats {
return c.stats
}

View File

@ -14,10 +14,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package storage package cache
import ( import (
"sync/atomic"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -37,8 +39,8 @@ func TestNewLRU(t *testing.T) {
} }
func TestLRU_Add(t *testing.T) { func TestLRU_Add(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
testKey1 := "test_key_1" testKey1 := "test_key_1"
@ -77,12 +79,14 @@ func TestLRU_Add(t *testing.T) {
assert.False(t, ok) assert.False(t, ok)
assert.Nil(t, v) assert.Nil(t, v)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Contains(t *testing.T) { func TestLRU_Contains(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(1, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
testKey1 := "test_key_1" testKey1 := "test_key_1"
@ -101,12 +105,14 @@ func TestLRU_Contains(t *testing.T) {
ok = c.Contains(testKey1) ok = c.Contains(testKey1)
assert.False(t, ok) assert.False(t, ok)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Get(t *testing.T) { func TestLRU_Get(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(1, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
testKey1 := "test_key_1" testKey1 := "test_key_1"
@ -128,11 +134,14 @@ func TestLRU_Get(t *testing.T) {
assert.False(t, ok) assert.False(t, ok)
assert.Nil(t, v) assert.Nil(t, v)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_GetOldest(t *testing.T) { func TestLRU_GetOldest(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
testKey1 := "test_key_1" testKey1 := "test_key_1"
@ -174,11 +183,14 @@ func TestLRU_GetOldest(t *testing.T) {
assert.EqualValues(t, testKey1, k) assert.EqualValues(t, testKey1, k)
assert.EqualValues(t, testValue1, v) assert.EqualValues(t, testValue1, v)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Keys(t *testing.T) { func TestLRU_Keys(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
testKey1 := "test_key_1" testKey1 := "test_key_1"
@ -204,8 +216,11 @@ func TestLRU_Keys(t *testing.T) {
keys = c.Keys() keys = c.Keys()
assert.ElementsMatch(t, []string{testKey3, testKey1}, keys) assert.ElementsMatch(t, []string{testKey3, testKey1}, keys)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Len(t *testing.T) { func TestLRU_Len(t *testing.T) {
c, err := NewLRU(2, nil) c, err := NewLRU(2, nil)
assert.Nil(t, err) assert.Nil(t, err)
@ -225,43 +240,31 @@ func TestLRU_Len(t *testing.T) {
c.Add(testKey3, testValue3) c.Add(testKey3, testValue3)
assert.EqualValues(t, c.Len(), 2) assert.EqualValues(t, c.Len(), 2)
} }
func TestLRU_Peek(t *testing.T) {
c, err := NewLRU(2, nil) func TestLRU_Capacity(t *testing.T) {
c, err := NewLRU(5, nil)
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, c.Len(), 0)
testKey1 := "test_key_1" testKey1 := "test_key_1"
testValue1 := "test_value_1" testValue1 := "test_value_1"
testKey2 := "test_key_2" testKey2 := "test_key_2"
testValue2 := "test_value_2" testValue2 := "test_value_2"
testKeyNotExist := "not_exist" testKey3 := "test_key_3"
testValue3 := "test_value_3"
c.Add(testKey1, testValue1) c.Add(testKey1, testValue1)
v, ok := c.Peek(testKey1) assert.EqualValues(t, c.Capacity(), 5)
assert.True(t, ok)
assert.EqualValues(t, testValue1, v)
c.Add(testKey2, testValue2) c.Add(testKey2, testValue2)
k, v, ok := c.GetOldest() assert.EqualValues(t, c.Capacity(), 5)
assert.True(t, ok)
assert.EqualValues(t, testKey1, k)
assert.EqualValues(t, testValue1, v)
v, ok = c.Peek(testKey1) c.Add(testKey3, testValue3)
assert.True(t, ok) assert.EqualValues(t, c.Capacity(), 5)
assert.EqualValues(t, testValue1, v)
k, v, ok = c.GetOldest()
assert.True(t, ok)
assert.EqualValues(t, testKey1, k)
assert.EqualValues(t, testValue1, v)
v, ok = c.Peek(testKeyNotExist)
assert.False(t, ok)
assert.Nil(t, v)
} }
func TestLRU_Purge(t *testing.T) { func TestLRU_Purge(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
@ -281,11 +284,14 @@ func TestLRU_Purge(t *testing.T) {
c.Purge() c.Purge()
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
assert.EqualValues(t, evicted, 3) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 3
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Remove(t *testing.T) { func TestLRU_Remove(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
@ -302,11 +308,14 @@ func TestLRU_Remove(t *testing.T) {
c.Remove(testKey2) c.Remove(testKey2)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
assert.EqualValues(t, evicted, 2) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 2
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_RemoveOldest(t *testing.T) { func TestLRU_RemoveOldest(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
@ -339,12 +348,15 @@ func TestLRU_RemoveOldest(t *testing.T) {
assert.Nil(t, v) assert.Nil(t, v)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
assert.EqualValues(t, evicted, 2) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 2
}, 1*time.Second, 100*time.Millisecond)
} }
func TestLRU_Resize(t *testing.T) { func TestLRU_Resize(t *testing.T) {
evicted := 0 evicted := int32(0)
c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, c.Len(), 0) assert.EqualValues(t, c.Len(), 0)
@ -376,10 +388,14 @@ func TestLRU_Resize(t *testing.T) {
assert.EqualValues(t, v, testValue2) assert.EqualValues(t, v, testValue2)
assert.EqualValues(t, c.Len(), 1) assert.EqualValues(t, c.Len(), 1)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
c.Resize(3) c.Resize(3)
assert.EqualValues(t, c.Len(), 1) assert.EqualValues(t, c.Len(), 1)
assert.EqualValues(t, evicted, 1) assert.Eventually(t, func() bool {
return atomic.LoadInt32(&evicted) == 1
}, 1*time.Second, 100*time.Millisecond)
} }

View File

@ -597,6 +597,9 @@ type queryNodeConfig struct {
// memory limit // memory limit
OverloadedMemoryThresholdPercentage float64 OverloadedMemoryThresholdPercentage float64
// cache limit
LocalFileCacheLimit int64
} }
func (p *queryNodeConfig) init(base *BaseTable) { func (p *queryNodeConfig) init(base *BaseTable) {
@ -617,6 +620,8 @@ func (p *queryNodeConfig) init(base *BaseTable) {
p.initSegcoreChunkRows() p.initSegcoreChunkRows()
p.initOverloadedMemoryThresholdPercentage() p.initOverloadedMemoryThresholdPercentage()
p.initLocalFileCacheLimit()
} }
// InitAlias initializes an alias for the QueryNode role. // InitAlias initializes an alias for the QueryNode role.
@ -691,6 +696,15 @@ func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() {
p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100 p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100
} }
func (p *queryNodeConfig) initLocalFileCacheLimit() {
overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("querynoe.chunkManager.localFileCacheLimit", "90")
localFileCacheLimit, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64)
if err != nil {
panic(err)
}
p.LocalFileCacheLimit = localFileCacheLimit
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// --- datacoord --- // --- datacoord ---
type dataCoordConfig struct { type dataCoordConfig struct {