Fix a regression of local storage (#20653)

Signed-off-by: yhmo <yihua.mo@zilliz.com>

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2022-11-21 10:19:10 +08:00 committed by GitHub
parent c8022ea63c
commit bf2107ecf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 357 additions and 188 deletions

View File

@ -40,7 +40,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
defer cancel()
alloc := NewAllocatorFactory()
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
b := &binlogIO{cm, alloc}
t.Run("Test upload", func(t *testing.T) {
@ -245,7 +245,7 @@ func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
k := path.Join("test_prepare_blob", key)
k := path.Join(cm.RootPath(), "test_prepare_blob", key)
blob := []byte{1, 2, 3, 255, 188}
err := cm.Write(ctx, k, blob[:])
@ -260,7 +260,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
defer cancel()
alloc := NewAllocatorFactory()
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
b := &binlogIO{
cm,
alloc,

View File

@ -42,7 +42,7 @@ var channelMetaNodeTestDir = "/tmp/milvus_test/channel_meta"
func TestNewChannel(t *testing.T) {
rc := &RootCoordFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(context.Background(), "")
defer cm.RemoveWithPrefix(context.Background(), cm.RootPath())
channel := newChannel("channel", 0, nil, rc, cm)
assert.NotNil(t, channel)
}
@ -114,7 +114,7 @@ func TestChannelMeta_InnerFunction(t *testing.T) {
cm = storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
channel = newChannel("insert-01", collID, nil, rc, cm)
)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
require.False(t, channel.hasSegment(0, true))
require.False(t, channel.hasSegment(0, false))
@ -214,7 +214,7 @@ func TestChannelMeta_segmentFlushed(t *testing.T) {
}
collID := UniqueID(1)
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test coll mot match", func(t *testing.T) {
channel := newChannel("channel", collID, nil, rc, cm)
@ -281,7 +281,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
pkType: schemapb.DataType_Int64,
}
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test addFlushedSegmentWithPKs", func(t *testing.T) {
tests := []struct {
@ -705,7 +705,7 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) {
endPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(200)}
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("chanName", collID, nil, rc, cm)
channel.chunkManager = &mockDataCM{}
@ -764,7 +764,7 @@ func TestChannelMeta_ChannelCP(t *testing.T) {
collID := UniqueID(1)
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer func() {
err := cm.RemoveWithPrefix(ctx, "")
err := cm.RemoveWithPrefix(ctx, cm.RootPath())
assert.NoError(t, err)
}()
@ -878,7 +878,7 @@ func (s *ChannelMetaSuite) SetupSuite() {
}
func (s *ChannelMetaSuite) TearDownSuite() {
s.cm.RemoveWithPrefix(context.Background(), "")
s.cm.RemoveWithPrefix(context.Background(), s.cm.RootPath())
}
func (s *ChannelMetaSuite) SetupTest() {

View File

@ -45,7 +45,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test getSegmentMeta", func(t *testing.T) {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
@ -554,7 +554,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
notEmptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{{
SegmentID: 100,
FieldBinlogs: nil,

View File

@ -22,6 +22,7 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
@ -93,7 +94,7 @@ func TestDataNode(t *testing.T) {
assert.Nil(t, err)
defer node.Stop()
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus"))
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))
paramtable.SetNodeID(1)
t.Run("Test WatchDmChannels ", func(t *testing.T) {
emptyNode := &DataNode{}
@ -471,7 +472,7 @@ func TestDataNode(t *testing.T) {
_, ok = node.flowgraphManager.getFlowgraphService(chName2)
assert.True(t, ok)
filePath := "import/rows_1.json"
filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json")
err = node.chunkManager.Write(ctx, filePath, content)
assert.NoError(t, err)
req := &datapb.ImportTaskRequest{
@ -547,7 +548,7 @@ func TestDataNode(t *testing.T) {
]
}`)
filePath := "import/rows_1.json"
filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json")
err = node.chunkManager.Write(ctx, filePath, content)
assert.NoError(t, err)
req := &datapb.ImportTaskRequest{
@ -581,7 +582,7 @@ func TestDataNode(t *testing.T) {
]
}`)
filePath := "import/rows_1.json"
filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json")
err = node.chunkManager.Write(ctx, filePath, content)
assert.NoError(t, err)
req := &datapb.ImportTaskRequest{
@ -757,7 +758,7 @@ func TestDataNode_AddSegment(t *testing.T) {
assert.Nil(t, err)
defer node.Stop()
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus"))
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))
paramtable.SetNodeID(1)
t.Run("test AddSegment", func(t *testing.T) {

View File

@ -145,7 +145,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
"add un-flushed and flushed segments"},
}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
@ -211,7 +211,7 @@ func TestDataSyncService_Start(t *testing.T) {
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(insertChannelName, collectionID, collMeta.GetSchema(), mockRootCoord, cm)
allocFactory := NewAllocatorFactory(1)
@ -414,7 +414,7 @@ func TestClearGlobalFlushingCache(t *testing.T) {
defer cancel()
dataCoord := &DataCoordFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("channel", 1, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm)
var err error

View File

@ -178,7 +178,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
tss = []uint64{1, 1, 1, 1, 1}
)
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test get segment by varChar primary keys", func(te *testing.T) {
channel := genMockChannel(segIDs, varCharPks, chanName)
@ -466,7 +466,7 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
@ -510,7 +510,7 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)

View File

@ -64,7 +64,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-create"
testPath := "/test/datanode/root/meta"
@ -159,7 +159,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate"
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
@ -336,7 +336,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
wg := sync.WaitGroup{}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
@ -574,7 +574,7 @@ func TestRollBF(t *testing.T) {
wg := sync.WaitGroup{}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
@ -702,7 +702,7 @@ func (s *InsertBufferNodeSuit) SetupSuite() {
}
func (s *InsertBufferNodeSuit) TearDownSuite() {
s.cm.RemoveWithPrefix(context.Background(), "")
s.cm.RemoveWithPrefix(context.Background(), s.cm.RootPath())
Params.DataNodeCfg.FlushInsertBufferSize = s.originalConfig
}
@ -917,7 +917,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
for _, test := range tests {
collMeta := Factory.GetCollectionMeta(test.collID, "collection", test.pkType)
rcf := &RootCoordFactory{
@ -975,7 +975,7 @@ func TestInsertBufferNode_updateSegmentStates(te *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
invalideTests := []struct {
channelCollID UniqueID

View File

@ -150,7 +150,7 @@ func TestRendezvousFlushManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
size := 1000
var counter atomic.Int64
@ -189,7 +189,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
size := 1000
var counter atomic.Int64
@ -288,7 +288,11 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
}
func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newTestChannel()
channel.collSchema = &schemapb.CollectionSchema{}
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {
@ -315,7 +319,10 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
}
func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
size := 1000
var counter atomic.Int64
@ -384,8 +391,11 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
}
func TestRendezvousFlushManager_dropMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test drop mode", func(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
var mut sync.Mutex
var result []*segmentFlushPack
@ -438,6 +448,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
t.Run("test drop mode with injection", func(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
var mut sync.Mutex
var result []*segmentFlushPack
@ -496,7 +507,10 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
}
func TestRendezvousFlushManager_close(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
size := 1000
var counter atomic.Int64
@ -536,7 +550,10 @@ func TestFlushNotifyFunc(t *testing.T) {
rcf := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("channel", 1, nil, rcf, cm)
@ -606,7 +623,11 @@ func TestDropVirtualChannelFunc(t *testing.T) {
}
vchanName := "vchan_01"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(vchanName, 1, nil, rcf, cm)
dataCoord := &DataCoordFactory{}

View File

@ -24,6 +24,7 @@ import (
"math"
"math/rand"
"path"
"path/filepath"
"runtime"
"strconv"
@ -414,9 +415,9 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy
indexPaths := make([]string, 0)
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(segmentID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := cm.Write(context.Background(), p, index.Value)
indexPath := filepath.Join(defaultLocalStorage, strconv.Itoa(int(segmentID)), index.Key)
indexPaths = append(indexPaths, indexPath)
err := cm.Write(context.Background(), indexPath, index.Value)
if err != nil {
return nil, err
}
@ -552,7 +553,7 @@ func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
}
func genLocalChunkManager() (storage.ChunkManager, error) {
p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data")
p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data")
lcm := storage.NewLocalChunkManager(storage.RootPath(p))
return lcm, nil
}
@ -569,7 +570,7 @@ func genRemoteChunkManager(ctx context.Context) (storage.ChunkManager, error) {
}
func genVectorChunkManager(ctx context.Context, col *Collection) (*storage.VectorChunkManager, error) {
p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data")
p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data")
lcm := storage.NewLocalChunkManager(storage.RootPath(p))
rcm, err := storage.NewMinioChunkManager(
@ -1000,7 +1001,7 @@ func saveBinLog(ctx context.Context,
}
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
key := path.Join("insert-log", k)
key := path.Join(defaultLocalStorage, "insert-log", k)
kvs[key] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: fieldID,
@ -1019,7 +1020,7 @@ func saveBinLog(ctx context.Context,
}
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
key := path.Join("delta-log", k)
key := path.Join(defaultLocalStorage, "delta-log", k)
kvs[key] = blob.Value[:]
statsBinlog = append(statsBinlog, &datapb.FieldBinlog{
FieldID: fieldID,
@ -1070,10 +1071,11 @@ func saveDeltaLog(collectionID UniqueID,
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID))
key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID)
key += "delta" // append suffix 'delta' to avoid conflicts against binlog
kvs[key] = blob.Value[:]
keyPath := path.Join(defaultLocalStorage, key)
kvs[keyPath] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: []*datapb.Binlog{{LogPath: key}},
Binlogs: []*datapb.Binlog{{LogPath: keyPath}},
})
log.Debug("[query node unittest] save delta log file to MinIO/S3")

View File

@ -68,8 +68,8 @@ func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string
if !exist {
return "", fmt.Errorf("local file cannot be found with filePath: %s", filePath)
}
absPath := path.Join(lcm.localPath, filePath)
return absPath, nil
return filePath, nil
}
func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) {
@ -80,14 +80,13 @@ func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (File
if !exist {
return nil, errors.New("local file cannot be found with filePath:" + filePath)
}
absPath := path.Join(lcm.localPath, filePath)
return os.Open(absPath)
return os.Open(filePath)
}
// Write writes the data to local storage.
func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, content []byte) error {
absPath := path.Join(lcm.localPath, filePath)
dir := path.Dir(absPath)
dir := path.Dir(filePath)
exist, err := lcm.Exist(ctx, dir)
if err != nil {
return err
@ -98,7 +97,7 @@ func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, conten
return err
}
}
return ioutil.WriteFile(absPath, content, os.ModePerm)
return ioutil.WriteFile(filePath, content, os.ModePerm)
}
// MultiWrite writes the data to local storage.
@ -118,8 +117,7 @@ func (lcm *LocalChunkManager) MultiWrite(ctx context.Context, contents map[strin
// Exist checks whether chunk is saved to local storage.
func (lcm *LocalChunkManager) Exist(ctx context.Context, filePath string) (bool, error) {
absPath := path.Join(lcm.localPath, filePath)
_, err := os.Stat(absPath)
_, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
@ -138,8 +136,8 @@ func (lcm *LocalChunkManager) Read(ctx context.Context, filePath string) ([]byte
if !exist {
return nil, fmt.Errorf("file not exist: %s", filePath)
}
absPath := path.Join(lcm.localPath, filePath)
return ioutil.ReadFile(absPath)
return ioutil.ReadFile(filePath)
}
// MultiRead reads the local storage data if exists.
@ -163,11 +161,10 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string,
var filePaths []string
var modTimes []time.Time
if recursive {
absPrefix := path.Join(lcm.localPath, prefix)
dir := filepath.Dir(absPrefix)
dir := filepath.Dir(prefix)
err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
if strings.HasPrefix(filePath, absPrefix) && !f.IsDir() {
filePaths = append(filePaths, strings.TrimPrefix(filePath, lcm.localPath))
if strings.HasPrefix(filePath, prefix) && !f.IsDir() {
filePaths = append(filePaths, filePath)
}
return nil
})
@ -183,14 +180,12 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string,
}
return filePaths, modTimes, nil
}
absPrefix := path.Join(lcm.localPath, prefix+"*")
absPaths, err := filepath.Glob(absPrefix)
globPaths, err := filepath.Glob(prefix + "*")
if err != nil {
return nil, nil, err
}
for _, absPath := range absPaths {
filePaths = append(filePaths, strings.TrimPrefix(absPath, lcm.localPath))
}
filePaths = append(filePaths, globPaths...)
for _, filePath := range filePaths {
modTime, err2 := lcm.getModTime(filePath)
if err2 != nil {
@ -198,6 +193,7 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string,
}
modTimes = append(modTimes, modTime)
}
return filePaths, modTimes, nil
}
@ -215,8 +211,8 @@ func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off i
if off < 0 || length < 0 {
return nil, io.EOF
}
absPath := path.Join(lcm.localPath, filePath)
file, err := os.Open(path.Clean(absPath))
file, err := os.Open(path.Clean(filePath))
if err != nil {
return nil, err
}
@ -229,13 +225,11 @@ func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off i
}
func (lcm *LocalChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
absPath := path.Join(lcm.localPath, filePath)
return mmap.Open(path.Clean(absPath))
return mmap.Open(path.Clean(filePath))
}
func (lcm *LocalChunkManager) Size(ctx context.Context, filePath string) (int64, error) {
absPath := path.Join(lcm.localPath, filePath)
fi, err := os.Stat(absPath)
fi, err := os.Stat(filePath)
if err != nil {
return 0, err
}
@ -250,8 +244,7 @@ func (lcm *LocalChunkManager) Remove(ctx context.Context, filePath string) error
return err
}
if exist {
absPath := path.Join(lcm.localPath, filePath)
err := os.RemoveAll(absPath)
err := os.RemoveAll(filePath)
if err != nil {
return err
}
@ -274,16 +267,24 @@ func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []strin
}
func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
// If the prefix is empty string, the ListWithPrefix() will return all files under current process work folder,
// MultiRemove() will delete all these files. This is a danger behavior, empty prefix is not allowed.
if len(prefix) == 0 {
errMsg := "empty prefix is not allowed for ChunkManager remove operation"
log.Error(errMsg)
return errors.New(errMsg)
}
filePaths, _, err := lcm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return err
}
return lcm.MultiRemove(ctx, filePaths)
}
func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) {
absPath := path.Join(lcm.localPath, filepath)
fi, err := os.Stat(absPath)
fi, err := os.Stat(filepath)
if err != nil {
log.Error("stat fileinfo error", zap.String("relative filepath", filepath))
return time.Time{}, err

View File

@ -18,8 +18,8 @@ package storage
import (
"context"
"fmt"
"path"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@ -28,6 +28,7 @@ import (
func TestLocalCM(t *testing.T) {
ctx := context.Background()
t.Run("test RootPath", func(t *testing.T) {
testCM := NewLocalChunkManager(RootPath(localPath))
assert.Equal(t, localPath, testCM.RootPath())
@ -37,7 +38,7 @@ func TestLocalCM(t *testing.T) {
testLoadRoot := "test_load"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testLoadRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
prepareTests := []struct {
key string
@ -51,7 +52,7 @@ func TestLocalCM(t *testing.T) {
}
for _, test := range prepareTests {
err := testCM.Write(ctx, path.Join(testLoadRoot, test.key), test.value)
err := testCM.Write(ctx, path.Join(localPath, testLoadRoot, test.key), test.value)
require.NoError(t, err)
}
@ -74,17 +75,17 @@ func TestLocalCM(t *testing.T) {
for _, test := range loadTests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey))
got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey))
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, got)
} else {
if test.loadKey == "/" {
got, err := testCM.Read(ctx, test.loadKey)
got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey))
assert.Error(t, err)
assert.Empty(t, got)
return
}
got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey))
got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey))
assert.Error(t, err)
assert.Empty(t, got)
}
@ -105,7 +106,7 @@ func TestLocalCM(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(testLoadRoot, test.prefix))
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(localPath, testLoadRoot, test.prefix))
assert.Nil(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
@ -127,7 +128,7 @@ func TestLocalCM(t *testing.T) {
for _, test := range multiLoadTests {
t.Run(test.description, func(t *testing.T) {
for i := range test.multiKeys {
test.multiKeys[i] = path.Join(testLoadRoot, test.multiKeys[i])
test.multiKeys[i] = path.Join(localPath, testLoadRoot, test.multiKeys[i])
}
if test.isvalid {
got, err := testCM.MultiRead(ctx, test.multiKeys)
@ -146,50 +147,53 @@ func TestLocalCM(t *testing.T) {
testMultiSaveRoot := "test_write"
testCM := NewLocalChunkManager(RootPath(localPath))
//defer testCM.RemoveWithPrefix(testMultiSaveRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
err := testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1"), []byte("111"))
key1 := path.Join(localPath, testMultiSaveRoot, "key_1")
err := testCM.Write(ctx, key1, []byte("111"))
assert.Nil(t, err)
err = testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_2"), []byte("222"))
key2 := path.Join(localPath, testMultiSaveRoot, "key_2")
err = testCM.Write(ctx, key2, []byte("222"))
assert.Nil(t, err)
val, err := testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_1"))
val, err := testCM.Read(ctx, key1)
assert.Nil(t, err)
assert.Equal(t, []byte("111"), val)
val, err = testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_2"))
val, err = testCM.Read(ctx, key2)
assert.Nil(t, err)
assert.Equal(t, []byte("222"), val)
err = testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1/key_1"), []byte("111"))
// localPath/testMultiSaveRoot/key_1 is a file already exist, use its path as directory is not allowed
key3 := path.Join(localPath, testMultiSaveRoot, "key_1/key_1")
err = testCM.Write(ctx, key3, []byte("111"))
assert.Error(t, err)
})
t.Run("test MultiSave", func(t *testing.T) {
testMultiSaveRoot := "test_multisave"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testMultiSaveRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
err := testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1"), []byte("111"))
err := testCM.Write(ctx, path.Join(localPath, testMultiSaveRoot, "key_1"), []byte("111"))
assert.Nil(t, err)
kvs := map[string][]byte{
path.Join(testMultiSaveRoot, "key_1"): []byte("123"),
path.Join(testMultiSaveRoot, "key_2"): []byte("456"),
path.Join(localPath, testMultiSaveRoot, "key_1"): []byte("123"),
path.Join(localPath, testMultiSaveRoot, "key_2"): []byte("456"),
}
err = testCM.MultiWrite(ctx, kvs)
assert.Nil(t, err)
val, err := testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_1"))
val, err := testCM.Read(ctx, path.Join(localPath, testMultiSaveRoot, "key_1"))
assert.Nil(t, err)
assert.Equal(t, []byte("123"), val)
kvs = map[string][]byte{
path.Join(testMultiSaveRoot, "key_1/key_1"): []byte("123"),
path.Join(testMultiSaveRoot, "key_2/key_2"): []byte("456"),
path.Join(localPath, testMultiSaveRoot, "key_1/key_1"): []byte("123"),
path.Join(localPath, testMultiSaveRoot, "key_2/key_2"): []byte("456"),
}
err = testCM.MultiWrite(ctx, kvs)
@ -200,7 +204,16 @@ func TestLocalCM(t *testing.T) {
testRemoveRoot := "test_remove"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testRemoveRoot)
// empty prefix is not allowed
err := testCM.RemoveWithPrefix(ctx, "")
assert.Error(t, err)
// prefix ".", nothing deleted
err = testCM.RemoveWithPrefix(ctx, ".")
assert.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
prepareTests := []struct {
k string
@ -217,7 +230,7 @@ func TestLocalCM(t *testing.T) {
}
for _, test := range prepareTests {
k := path.Join(testRemoveRoot, test.k)
k := path.Join(localPath, testRemoveRoot, test.k)
err := testCM.Write(ctx, k, test.v)
require.NoError(t, err)
}
@ -234,7 +247,7 @@ func TestLocalCM(t *testing.T) {
for _, test := range removeTests {
t.Run(test.description, func(t *testing.T) {
k := path.Join(testRemoveRoot, test.removeKey)
k := path.Join(localPath, testRemoveRoot, test.removeKey)
v, err := testCM.Read(ctx, k)
require.NoError(t, err)
require.Equal(t, test.valueBeforeRemove, v)
@ -249,9 +262,9 @@ func TestLocalCM(t *testing.T) {
}
multiRemoveTest := []string{
path.Join(testRemoveRoot, "mkey_1"),
path.Join(testRemoveRoot, "mkey_2"),
path.Join(testRemoveRoot, "mkey_3"),
path.Join(localPath, testRemoveRoot, "mkey_1"),
path.Join(localPath, testRemoveRoot, "mkey_2"),
path.Join(localPath, testRemoveRoot, "mkey_3"),
}
lv, err := testCM.MultiRead(ctx, multiRemoveTest)
@ -268,11 +281,11 @@ func TestLocalCM(t *testing.T) {
}
removeWithPrefixTest := []string{
path.Join(testRemoveRoot, "key_prefix_1"),
path.Join(testRemoveRoot, "key_prefix_2"),
path.Join(testRemoveRoot, "key_prefix_3"),
path.Join(localPath, testRemoveRoot, "key_prefix_1"),
path.Join(localPath, testRemoveRoot, "key_prefix_2"),
path.Join(localPath, testRemoveRoot, "key_prefix_3"),
}
removePrefix := path.Join(testRemoveRoot, "key_prefix")
removePrefix := path.Join(localPath, testRemoveRoot, "key_prefix")
lv, err = testCM.MultiRead(ctx, removeWithPrefixTest)
require.NoError(t, err)
@ -292,9 +305,9 @@ func TestLocalCM(t *testing.T) {
testLoadPartialRoot := "read_at"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testLoadPartialRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
key := path.Join(testLoadPartialRoot, "TestMinIOKV_LoadPartial_key")
key := path.Join(localPath, testLoadPartialRoot, "TestMinIOKV_LoadPartial_key")
value := []byte("TestMinIOKV_LoadPartial_value")
err := testCM.Write(ctx, key, value)
@ -337,9 +350,9 @@ func TestLocalCM(t *testing.T) {
testGetSizeRoot := "get_size"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testGetSizeRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
key := path.Join(testGetSizeRoot, "TestMinIOKV_GetSize_key")
key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetSize_value")
err := testCM.Write(ctx, key, value)
@ -349,20 +362,44 @@ func TestLocalCM(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, size, int64(len(value)))
key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2")
key2 := path.Join(localPath, testGetSizeRoot, "TestMemoryKV_GetSize_key2")
size, err = testCM.Size(ctx, key2)
assert.Error(t, err)
assert.Equal(t, int64(0), size)
})
t.Run("test read", func(t *testing.T) {
testGetSizeRoot := "get_path"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetPath_key")
value := []byte("TestMinIOKV_GetPath_value")
reader, err := testCM.Reader(ctx, key)
assert.Nil(t, reader)
assert.Error(t, err)
_, err = testCM.getModTime(key)
assert.Error(t, err)
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
reader, err = testCM.Reader(ctx, key)
assert.NoError(t, err)
assert.NotNil(t, reader)
})
t.Run("test Path", func(t *testing.T) {
testGetSizeRoot := "get_path"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testGetSizeRoot)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
key := path.Join(testGetSizeRoot, "TestMinIOKV_GetPath_key")
key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetPath_key")
value := []byte("TestMinIOKV_GetPath_value")
err := testCM.Write(ctx, key, value)
@ -370,9 +407,9 @@ func TestLocalCM(t *testing.T) {
p, err := testCM.Path(ctx, key)
assert.NoError(t, err)
assert.Equal(t, p, path.Join(localPath, key))
assert.Equal(t, p, key)
key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2")
key2 := path.Join(localPath, testGetSizeRoot, "TestMemoryKV_GetSize_key2")
p, err = testCM.Path(ctx, key2)
assert.Error(t, err)
@ -383,82 +420,187 @@ func TestLocalCM(t *testing.T) {
testPrefix := "prefix"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testPrefix)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
pathB := path.Join("a", "b")
key := path.Join(testPrefix, pathB)
// write 2 files:
// localPath/testPrefix/a/b
// localPath/testPrefix/a/c
value := []byte("a")
pathB := path.Join("a", "b")
key1 := path.Join(localPath, testPrefix, pathB)
err := testCM.Write(ctx, key, value)
err := testCM.Write(ctx, key1, value)
assert.NoError(t, err)
pathC := path.Join("a", "c")
key = path.Join(testPrefix, pathC)
err = testCM.Write(ctx, key, value)
key2 := path.Join(localPath, testPrefix, pathC)
err = testCM.Write(ctx, key2, value)
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
// recursive find localPath/testPrefix/a*
// return:
// localPath/testPrefix/a/b
// localPath/testPrefix/a/c
pathPrefix := path.Join(localPath, testPrefix, "a")
dirs, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(m))
assert.Contains(t, dirs, key1)
assert.Contains(t, dirs, key2)
testCM.RemoveWithPrefix(ctx, testPrefix)
r, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
// remove files of localPath/testPrefix
err = testCM.RemoveWithPrefix(ctx, path.Join(localPath, testPrefix))
assert.NoError(t, err)
assert.Equal(t, len(r), 0)
assert.Equal(t, len(m), 0)
// no file returned
dirs, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(m))
})
t.Run("test ListWithPrefix", func(t *testing.T) {
testPrefix := "prefix-ListWithPrefix"
testCM := NewLocalChunkManager(RootPath(localPath))
defer testCM.RemoveWithPrefix(ctx, testPrefix)
defer testCM.RemoveWithPrefix(ctx, testCM.RootPath())
key := path.Join(testPrefix, "abc", "def")
// write 4 files:
// localPath/testPrefix/abc/def
// localPath/testPrefix/abc/deg
// localPath/testPrefix/abd
// localPath/testPrefix/bcd
key1 := path.Join(localPath, testPrefix, "abc", "def")
value := []byte("a")
err := testCM.Write(ctx, key, value)
err := testCM.Write(ctx, key1, value)
assert.NoError(t, err)
key = path.Join(testPrefix, "abc", "deg")
err = testCM.Write(ctx, key, value)
key2 := path.Join(localPath, testPrefix, "abc", "deg")
err = testCM.Write(ctx, key2, value)
assert.NoError(t, err)
key = path.Join(testPrefix, "abd")
err = testCM.Write(ctx, key, value)
key3 := path.Join(localPath, testPrefix, "abd")
err = testCM.Write(ctx, key3, value)
assert.NoError(t, err)
key = path.Join(testPrefix, "bcd")
err = testCM.Write(ctx, key, value)
key4 := path.Join(localPath, testPrefix, "bcd")
err = testCM.Write(ctx, key4, value)
assert.NoError(t, err)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix+"/", false)
// non-recursive find localPath/testPrefix/*
// return:
// localPath/testPrefix/abc
// localPath/testPrefix/abd
// localPath/testPrefix/bcd
testPrefix1 := path.Join(localPath, testPrefix)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix1+"/", false)
assert.Nil(t, err)
fmt.Println(dirs)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
assert.Contains(t, dirs, filepath.Dir(key1))
assert.Contains(t, dirs, key3)
assert.Contains(t, dirs, key4)
testPrefix2 := path.Join(testPrefix, "a")
// recursive find localPath/testPrefix/*
// return:
// localPath/testPrefix/abc/def
// localPath/testPrefix/abc/deg
// localPath/testPrefix/abd
// localPath/testPrefix/bcd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1+"/", true)
assert.Nil(t, err)
assert.Equal(t, 4, len(dirs))
assert.Equal(t, 4, len(mods))
assert.Contains(t, dirs, key1)
assert.Contains(t, dirs, key2)
assert.Contains(t, dirs, key3)
assert.Contains(t, dirs, key4)
// non-recursive find localPath/testPrefix/a*
// return:
// localPath/testPrefix/abc
// localPath/testPrefix/abd
testPrefix2 := path.Join(localPath, testPrefix, "a")
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false)
assert.Nil(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
assert.Contains(t, dirs, filepath.Dir(key1))
assert.Contains(t, dirs, key3)
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false)
// recursive find localPath/testPrefix/a*
// return:
// localPath/testPrefix/abc/def
// localPath/testPrefix/abc/deg
// localPath/testPrefix/abd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true)
assert.Nil(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
assert.Contains(t, dirs, key1)
assert.Contains(t, dirs, key2)
assert.Contains(t, dirs, key3)
err = testCM.RemoveWithPrefix(ctx, testPrefix)
// remove files of localPath/testPrefix/a*, one file left
// localPath/testPrefix/bcd
err = testCM.RemoveWithPrefix(ctx, testPrefix2)
assert.NoError(t, err)
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix, false)
assert.NoError(t, err)
fmt.Println(dirs)
// dir still exist
// non-recursive find localPath/testPrefix
// return:
// localPath/testPrefix
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false)
assert.Nil(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
assert.Contains(t, dirs, filepath.Dir(key4))
// recursive find localPath/testPrefix
// return:
// localPath/testPrefix/bcd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
assert.Contains(t, dirs, key4)
// non-recursive find localPath/testPrefix/a*
// return:
// localPath/testPrefix/abc
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false)
assert.Nil(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
assert.Contains(t, dirs, filepath.Dir(key1))
// recursive find localPath/testPrefix/a*
// no file returned
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true)
assert.Nil(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(mods))
// remove the folder localPath/testPrefix
// the file localPath/testPrefix/bcd is removed, but the folder testPrefix still exist
err = testCM.RemoveWithPrefix(ctx, testPrefix1)
assert.NoError(t, err)
// recursive find localPath/testPrefix
// no file returned
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(mods))
// recursive find localPath/testPrefix
// return
// localPath/testPrefix
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
assert.Contains(t, dirs, filepath.Dir(key4))
})
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
"path"
"testing"
"github.com/stretchr/testify/assert"
@ -139,7 +140,7 @@ func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorCh
}
var Params paramtable.BaseTable
var localPath = "/tmp/milvus/test_data/"
var localPath = "/tmp/milvus_test/chunkmanager/"
func TestMain(m *testing.M) {
Params.Init()
@ -180,7 +181,7 @@ func TestVectorChunkManager_GetPath(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1"
key := path.Join(localPath, "1")
err = vcm.Write(ctx, key, []byte{1})
assert.Nil(t, err)
pathGet, err := vcm.Path(ctx, key)
@ -209,7 +210,7 @@ func TestVectorChunkManager_GetSize(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1"
key := path.Join(localPath, "1")
err = vcm.Write(ctx, key, []byte{1})
assert.Nil(t, err)
sizeGet, err := vcm.Size(ctx, key)
@ -239,25 +240,26 @@ func TestVectorChunkManager_Write(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1"
err = vcm.Write(ctx, key, []byte{1})
key1 := path.Join(localPath, "key_1")
key2 := path.Join(localPath, "key_2")
err = vcm.Write(ctx, key1, []byte{1})
assert.Nil(t, err)
exist, err := vcm.Exist(ctx, key)
exist, err := vcm.Exist(ctx, key1)
assert.True(t, exist)
assert.NoError(t, err)
contents := map[string][]byte{
"key_1": {111},
"key_2": {222},
key1: {111},
key2: {222},
}
err = vcm.MultiWrite(ctx, contents)
assert.NoError(t, err)
exist, err = vcm.Exist(ctx, "key_1")
exist, err = vcm.Exist(ctx, key1)
assert.True(t, exist)
assert.NoError(t, err)
exist, err = vcm.Exist(ctx, "key_2")
exist, err = vcm.Exist(ctx, key2)
assert.True(t, exist)
assert.NoError(t, err)
@ -278,31 +280,32 @@ func TestVectorChunkManager_Remove(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, vcm)
key := "1"
err = vcm.cacheStorage.Write(ctx, key, []byte{1})
key1 := path.Join(localPath, "key_1")
key2 := path.Join(localPath, "key_2")
err = vcm.cacheStorage.Write(ctx, key1, []byte{1})
assert.Nil(t, err)
err = vcm.Remove(ctx, key)
err = vcm.Remove(ctx, key1)
assert.Nil(t, err)
exist, err := vcm.Exist(ctx, key)
exist, err := vcm.Exist(ctx, key1)
assert.False(t, exist)
assert.NoError(t, err)
contents := map[string][]byte{
"key_1": {111},
"key_2": {222},
key1: {111},
key2: {222},
}
err = vcm.cacheStorage.MultiWrite(ctx, contents)
assert.NoError(t, err)
err = vcm.MultiRemove(ctx, []string{"key_1", "key_2"})
err = vcm.MultiRemove(ctx, []string{key1, key2})
assert.NoError(t, err)
exist, err = vcm.Exist(ctx, "key_1")
exist, err = vcm.Exist(ctx, key1)
assert.False(t, exist)
assert.NoError(t, err)
exist, err = vcm.Exist(ctx, "key_2")
exist, err = vcm.Exist(ctx, key2)
assert.False(t, exist)
assert.NoError(t, err)

View File

@ -24,6 +24,7 @@ import (
"errors"
"math"
"os"
"path"
"strconv"
"testing"
"time"
@ -249,7 +250,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
filePath := TempFilesPath + "rows_1.json"
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
rowCounter := &rowCounterTest{}
assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter)
@ -313,70 +314,70 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string {
ctx := context.Background()
files := make([]string, 0)
filePath := "FieldBool.npy"
filePath := path.Join(cm.RootPath(), "FieldBool.npy")
content, err := CreateNumpyData([]bool{true, false, true, true, true})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldInt8.npy"
filePath = path.Join(cm.RootPath(), "FieldInt8.npy")
content, err = CreateNumpyData([]int8{10, 11, 12, 13, 14})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldInt16.npy"
filePath = path.Join(cm.RootPath(), "FieldInt16.npy")
content, err = CreateNumpyData([]int16{100, 101, 102, 103, 104})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldInt32.npy"
filePath = path.Join(cm.RootPath(), "FieldInt32.npy")
content, err = CreateNumpyData([]int32{1000, 1001, 1002, 1003, 1004})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldInt64.npy"
filePath = path.Join(cm.RootPath(), "FieldInt64.npy")
content, err = CreateNumpyData([]int64{10000, 10001, 10002, 10003, 10004})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldFloat.npy"
filePath = path.Join(cm.RootPath(), "FieldFloat.npy")
content, err = CreateNumpyData([]float32{3.14, 3.15, 3.16, 3.17, 3.18})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldDouble.npy"
filePath = path.Join(cm.RootPath(), "FieldDouble.npy")
content, err = CreateNumpyData([]float64{5.1, 5.2, 5.3, 5.4, 5.5})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldString.npy"
filePath = path.Join(cm.RootPath(), "FieldString.npy")
content, err = CreateNumpyData([]string{"a", "bb", "ccc", "dd", "e"})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldBinaryVector.npy"
filePath = path.Join(cm.RootPath(), "FieldBinaryVector.npy")
content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = "FieldFloatVector.npy"
filePath = path.Join(cm.RootPath(), "FieldFloatVector.npy")
content, err = CreateNumpyData([][4]float32{{1, 2, 3, 4}, {3, 4, 5, 6}, {5, 6, 7, 8}, {7, 8, 9, 10}, {9, 10, 11, 12}})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
@ -397,7 +398,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
idAllocator := newIDAllocator(ctx, t, nil)
@ -430,7 +431,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// row count of fields not equal
filePath := "FieldInt8.npy"
filePath := path.Join(cm.RootPath(), "FieldInt8.npy")
content, err := CreateNumpyData([]int8{10})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
@ -493,7 +494,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
idAllocator := newIDAllocator(ctx, t, nil)
@ -532,7 +533,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
tr.Record("generate " + strconv.Itoa(rowCount) + " rows")
// generate a json file
filePath := "row_perf.json"
filePath := path.Join(cm.RootPath(), "row_perf.json")
func() {
var b bytes.Buffer
bw := bufio.NewWriter(&b)
@ -545,7 +546,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
err = cm.Write(ctx, filePath, b.Bytes())
assert.NoError(t, err)
}()
tr.Record("generate large json file " + filePath)
tr.Record("generate large json file: " + filePath)
rowCounter := &rowCounterTest{}
assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter)
@ -787,10 +788,10 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
]
}`)
filePath := "rows_1.json"
filePath := path.Join(cm.RootPath(), "rows_1.json")
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
rowCounter := &rowCounterTest{}
assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter)
@ -813,9 +814,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
files := []string{filePath}
wrapper.reportImportAttempts = 2
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
return errors.New("mock error")
@ -837,7 +836,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
defer cm.RemoveWithPrefix(ctx, "")
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
idAllocator := newIDAllocator(ctx, t, nil)