diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 31ff0b5bf1..7d8f1bc11e 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -40,7 +40,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { defer cancel() alloc := NewAllocatorFactory() cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) b := &binlogIO{cm, alloc} t.Run("Test upload", func(t *testing.T) { @@ -245,7 +245,7 @@ func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - k := path.Join("test_prepare_blob", key) + k := path.Join(cm.RootPath(), "test_prepare_blob", key) blob := []byte{1, 2, 3, 255, 188} err := cm.Write(ctx, k, blob[:]) @@ -260,7 +260,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { defer cancel() alloc := NewAllocatorFactory() cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) b := &binlogIO{ cm, alloc, diff --git a/internal/datanode/channel_meta_test.go b/internal/datanode/channel_meta_test.go index 45b19c2f10..1bcde9d64f 100644 --- a/internal/datanode/channel_meta_test.go +++ b/internal/datanode/channel_meta_test.go @@ -42,7 +42,7 @@ var channelMetaNodeTestDir = "/tmp/milvus_test/channel_meta" func TestNewChannel(t *testing.T) { rc := &RootCoordFactory{} cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) - defer cm.RemoveWithPrefix(context.Background(), "") + defer cm.RemoveWithPrefix(context.Background(), cm.RootPath()) channel := newChannel("channel", 0, nil, rc, cm) assert.NotNil(t, channel) } @@ -114,7 +114,7 @@ func TestChannelMeta_InnerFunction(t *testing.T) { cm = storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) channel = newChannel("insert-01", collID, nil, rc, cm) ) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) require.False(t, channel.hasSegment(0, true)) require.False(t, channel.hasSegment(0, false)) @@ -214,7 +214,7 @@ func TestChannelMeta_segmentFlushed(t *testing.T) { } collID := UniqueID(1) cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("Test coll mot match", func(t *testing.T) { channel := newChannel("channel", collID, nil, rc, cm) @@ -281,7 +281,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) { pkType: schemapb.DataType_Int64, } cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("Test addFlushedSegmentWithPKs", func(t *testing.T) { tests := []struct { @@ -705,7 +705,7 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) { endPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(200)} cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) channel := newChannel("chanName", collID, nil, rc, cm) channel.chunkManager = &mockDataCM{} @@ -764,7 +764,7 @@ func TestChannelMeta_ChannelCP(t *testing.T) { collID := UniqueID(1) cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir)) defer func() { - err := cm.RemoveWithPrefix(ctx, "") + err := cm.RemoveWithPrefix(ctx, cm.RootPath()) assert.NoError(t, err) }() @@ -878,7 +878,7 @@ func (s *ChannelMetaSuite) SetupSuite() { } func (s *ChannelMetaSuite) TearDownSuite() { - s.cm.RemoveWithPrefix(context.Background(), "") + s.cm.RemoveWithPrefix(context.Background(), s.cm.RootPath()) } func (s *ChannelMetaSuite) SetupTest() { diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 71ed408125..69cb8cdb16 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -45,7 +45,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("Test getSegmentMeta", func(t *testing.T) { rc := &RootCoordFactory{ pkType: schemapb.DataType_Int64, @@ -554,7 +554,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) notEmptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{{ SegmentID: 100, FieldBinlogs: nil, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 811472a65b..f1371b54ee 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -22,6 +22,7 @@ import ( "math" "math/rand" "os" + "path/filepath" "strconv" "strings" "sync" @@ -93,7 +94,7 @@ func TestDataNode(t *testing.T) { assert.Nil(t, err) defer node.Stop() - node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus")) + node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) paramtable.SetNodeID(1) t.Run("Test WatchDmChannels ", func(t *testing.T) { emptyNode := &DataNode{} @@ -471,7 +472,7 @@ func TestDataNode(t *testing.T) { _, ok = node.flowgraphManager.getFlowgraphService(chName2) assert.True(t, ok) - filePath := "import/rows_1.json" + filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") err = node.chunkManager.Write(ctx, filePath, content) assert.NoError(t, err) req := &datapb.ImportTaskRequest{ @@ -547,7 +548,7 @@ func TestDataNode(t *testing.T) { ] }`) - filePath := "import/rows_1.json" + filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") err = node.chunkManager.Write(ctx, filePath, content) assert.NoError(t, err) req := &datapb.ImportTaskRequest{ @@ -581,7 +582,7 @@ func TestDataNode(t *testing.T) { ] }`) - filePath := "import/rows_1.json" + filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") err = node.chunkManager.Write(ctx, filePath, content) assert.NoError(t, err) req := &datapb.ImportTaskRequest{ @@ -757,7 +758,7 @@ func TestDataNode_AddSegment(t *testing.T) { assert.Nil(t, err) defer node.Stop() - node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus")) + node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) paramtable.SetNodeID(1) t.Run("test AddSegment", func(t *testing.T) { diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index fa5019ffcd..7507bdaee0 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -145,7 +145,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { "add un-flushed and flushed segments"}, } cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) for _, test := range tests { te.Run(test.description, func(t *testing.T) { @@ -211,7 +211,7 @@ func TestDataSyncService_Start(t *testing.T) { flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) channel := newChannel(insertChannelName, collectionID, collMeta.GetSchema(), mockRootCoord, cm) allocFactory := NewAllocatorFactory(1) @@ -414,7 +414,7 @@ func TestClearGlobalFlushingCache(t *testing.T) { defer cancel() dataCoord := &DataCoordFactory{} cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) channel := newChannel("channel", 1, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm) var err error diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index e7cf7c465e..effe2b4763 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -178,7 +178,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { tss = []uint64{1, 1, 1, 1, 1} ) cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("Test get segment by varChar primary keys", func(te *testing.T) { channel := genMockChannel(segIDs, varCharPks, chanName) @@ -466,7 +466,7 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) @@ -510,7 +510,7 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 242da89055..2535f115d5 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -64,7 +64,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-create" testPath := "/test/datanode/root/meta" @@ -159,7 +159,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate" cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) @@ -336,7 +336,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { wg := sync.WaitGroup{} cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) { fpMut.Lock() flushPacks = append(flushPacks, pack) @@ -574,7 +574,7 @@ func TestRollBF(t *testing.T) { wg := sync.WaitGroup{} cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) { fpMut.Lock() flushPacks = append(flushPacks, pack) @@ -702,7 +702,7 @@ func (s *InsertBufferNodeSuit) SetupSuite() { } func (s *InsertBufferNodeSuit) TearDownSuite() { - s.cm.RemoveWithPrefix(context.Background(), "") + s.cm.RemoveWithPrefix(context.Background(), s.cm.RootPath()) Params.DataNodeCfg.FlushInsertBufferSize = s.originalConfig } @@ -917,7 +917,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { } cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) for _, test := range tests { collMeta := Factory.GetCollectionMeta(test.collID, "collection", test.pkType) rcf := &RootCoordFactory{ @@ -975,7 +975,7 @@ func TestInsertBufferNode_updateSegmentStates(te *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) invalideTests := []struct { channelCollID UniqueID diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 9854c05259..714ab87443 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -150,7 +150,7 @@ func TestRendezvousFlushManager(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) size := 1000 var counter atomic.Int64 @@ -189,7 +189,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) size := 1000 var counter atomic.Int64 @@ -288,7 +288,11 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { } func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + channel := newTestChannel() channel.collSchema = &schemapb.CollectionSchema{} fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) { @@ -315,7 +319,10 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) { } func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) size := 1000 var counter atomic.Int64 @@ -384,8 +391,11 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { } func TestRendezvousFlushManager_dropMode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() t.Run("test drop mode", func(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) var mut sync.Mutex var result []*segmentFlushPack @@ -438,6 +448,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { }) t.Run("test drop mode with injection", func(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) var mut sync.Mutex var result []*segmentFlushPack @@ -496,7 +507,10 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { } func TestRendezvousFlushManager_close(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) size := 1000 var counter atomic.Int64 @@ -536,7 +550,10 @@ func TestFlushNotifyFunc(t *testing.T) { rcf := &RootCoordFactory{ pkType: schemapb.DataType_Int64, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) channel := newChannel("channel", 1, nil, rcf, cm) @@ -606,7 +623,11 @@ func TestDropVirtualChannelFunc(t *testing.T) { } vchanName := "vchan_01" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + channel := newChannel(vchanName, 1, nil, rcf, cm) dataCoord := &DataCoordFactory{} diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 6b81c40e0a..b45bb9cc6a 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -24,6 +24,7 @@ import ( "math" "math/rand" "path" + "path/filepath" "runtime" "strconv" @@ -414,9 +415,9 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy indexPaths := make([]string, 0) for _, index := range serializedIndexBlobs { - p := strconv.Itoa(int(segmentID)) + "/" + index.Key - indexPaths = append(indexPaths, p) - err := cm.Write(context.Background(), p, index.Value) + indexPath := filepath.Join(defaultLocalStorage, strconv.Itoa(int(segmentID)), index.Key) + indexPaths = append(indexPaths, indexPath) + err := cm.Write(context.Background(), indexPath, index.Value) if err != nil { return nil, err } @@ -552,7 +553,7 @@ func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { } func genLocalChunkManager() (storage.ChunkManager, error) { - p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data") + p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data") lcm := storage.NewLocalChunkManager(storage.RootPath(p)) return lcm, nil } @@ -569,7 +570,7 @@ func genRemoteChunkManager(ctx context.Context) (storage.ChunkManager, error) { } func genVectorChunkManager(ctx context.Context, col *Collection) (*storage.VectorChunkManager, error) { - p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data") + p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data") lcm := storage.NewLocalChunkManager(storage.RootPath(p)) rcm, err := storage.NewMinioChunkManager( @@ -1000,7 +1001,7 @@ func saveBinLog(ctx context.Context, } k := JoinIDPath(collectionID, partitionID, segmentID, fieldID) - key := path.Join("insert-log", k) + key := path.Join(defaultLocalStorage, "insert-log", k) kvs[key] = blob.Value[:] fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{ FieldID: fieldID, @@ -1019,7 +1020,7 @@ func saveBinLog(ctx context.Context, } k := JoinIDPath(collectionID, partitionID, segmentID, fieldID) - key := path.Join("delta-log", k) + key := path.Join(defaultLocalStorage, "delta-log", k) kvs[key] = blob.Value[:] statsBinlog = append(statsBinlog, &datapb.FieldBinlog{ FieldID: fieldID, @@ -1070,10 +1071,11 @@ func saveDeltaLog(collectionID UniqueID, log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID)) key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID) key += "delta" // append suffix 'delta' to avoid conflicts against binlog - kvs[key] = blob.Value[:] + keyPath := path.Join(defaultLocalStorage, key) + kvs[keyPath] = blob.Value[:] fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{ FieldID: pkFieldID, - Binlogs: []*datapb.Binlog{{LogPath: key}}, + Binlogs: []*datapb.Binlog{{LogPath: keyPath}}, }) log.Debug("[query node unittest] save delta log file to MinIO/S3") diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index 78a488d59d..4b0d4c66e4 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -68,8 +68,8 @@ func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string if !exist { return "", fmt.Errorf("local file cannot be found with filePath: %s", filePath) } - absPath := path.Join(lcm.localPath, filePath) - return absPath, nil + + return filePath, nil } func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) { @@ -80,14 +80,13 @@ func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (File if !exist { return nil, errors.New("local file cannot be found with filePath:" + filePath) } - absPath := path.Join(lcm.localPath, filePath) - return os.Open(absPath) + + return os.Open(filePath) } // Write writes the data to local storage. func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, content []byte) error { - absPath := path.Join(lcm.localPath, filePath) - dir := path.Dir(absPath) + dir := path.Dir(filePath) exist, err := lcm.Exist(ctx, dir) if err != nil { return err @@ -98,7 +97,7 @@ func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, conten return err } } - return ioutil.WriteFile(absPath, content, os.ModePerm) + return ioutil.WriteFile(filePath, content, os.ModePerm) } // MultiWrite writes the data to local storage. @@ -118,8 +117,7 @@ func (lcm *LocalChunkManager) MultiWrite(ctx context.Context, contents map[strin // Exist checks whether chunk is saved to local storage. func (lcm *LocalChunkManager) Exist(ctx context.Context, filePath string) (bool, error) { - absPath := path.Join(lcm.localPath, filePath) - _, err := os.Stat(absPath) + _, err := os.Stat(filePath) if err != nil { if os.IsNotExist(err) { return false, nil @@ -138,8 +136,8 @@ func (lcm *LocalChunkManager) Read(ctx context.Context, filePath string) ([]byte if !exist { return nil, fmt.Errorf("file not exist: %s", filePath) } - absPath := path.Join(lcm.localPath, filePath) - return ioutil.ReadFile(absPath) + + return ioutil.ReadFile(filePath) } // MultiRead reads the local storage data if exists. @@ -163,11 +161,10 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string, var filePaths []string var modTimes []time.Time if recursive { - absPrefix := path.Join(lcm.localPath, prefix) - dir := filepath.Dir(absPrefix) + dir := filepath.Dir(prefix) err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error { - if strings.HasPrefix(filePath, absPrefix) && !f.IsDir() { - filePaths = append(filePaths, strings.TrimPrefix(filePath, lcm.localPath)) + if strings.HasPrefix(filePath, prefix) && !f.IsDir() { + filePaths = append(filePaths, filePath) } return nil }) @@ -183,14 +180,12 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string, } return filePaths, modTimes, nil } - absPrefix := path.Join(lcm.localPath, prefix+"*") - absPaths, err := filepath.Glob(absPrefix) + + globPaths, err := filepath.Glob(prefix + "*") if err != nil { return nil, nil, err } - for _, absPath := range absPaths { - filePaths = append(filePaths, strings.TrimPrefix(absPath, lcm.localPath)) - } + filePaths = append(filePaths, globPaths...) for _, filePath := range filePaths { modTime, err2 := lcm.getModTime(filePath) if err2 != nil { @@ -198,6 +193,7 @@ func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string, } modTimes = append(modTimes, modTime) } + return filePaths, modTimes, nil } @@ -215,8 +211,8 @@ func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off i if off < 0 || length < 0 { return nil, io.EOF } - absPath := path.Join(lcm.localPath, filePath) - file, err := os.Open(path.Clean(absPath)) + + file, err := os.Open(path.Clean(filePath)) if err != nil { return nil, err } @@ -229,13 +225,11 @@ func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off i } func (lcm *LocalChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) { - absPath := path.Join(lcm.localPath, filePath) - return mmap.Open(path.Clean(absPath)) + return mmap.Open(path.Clean(filePath)) } func (lcm *LocalChunkManager) Size(ctx context.Context, filePath string) (int64, error) { - absPath := path.Join(lcm.localPath, filePath) - fi, err := os.Stat(absPath) + fi, err := os.Stat(filePath) if err != nil { return 0, err } @@ -250,8 +244,7 @@ func (lcm *LocalChunkManager) Remove(ctx context.Context, filePath string) error return err } if exist { - absPath := path.Join(lcm.localPath, filePath) - err := os.RemoveAll(absPath) + err := os.RemoveAll(filePath) if err != nil { return err } @@ -274,16 +267,24 @@ func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []strin } func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { + // If the prefix is empty string, the ListWithPrefix() will return all files under current process work folder, + // MultiRemove() will delete all these files. This is a danger behavior, empty prefix is not allowed. + if len(prefix) == 0 { + errMsg := "empty prefix is not allowed for ChunkManager remove operation" + log.Error(errMsg) + return errors.New(errMsg) + } + filePaths, _, err := lcm.ListWithPrefix(ctx, prefix, true) if err != nil { return err } + return lcm.MultiRemove(ctx, filePaths) } func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) { - absPath := path.Join(lcm.localPath, filepath) - fi, err := os.Stat(absPath) + fi, err := os.Stat(filepath) if err != nil { log.Error("stat fileinfo error", zap.String("relative filepath", filepath)) return time.Time{}, err diff --git a/internal/storage/local_chunk_manager_test.go b/internal/storage/local_chunk_manager_test.go index 98062e7428..59c80defee 100644 --- a/internal/storage/local_chunk_manager_test.go +++ b/internal/storage/local_chunk_manager_test.go @@ -18,8 +18,8 @@ package storage import ( "context" - "fmt" "path" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -28,6 +28,7 @@ import ( func TestLocalCM(t *testing.T) { ctx := context.Background() + t.Run("test RootPath", func(t *testing.T) { testCM := NewLocalChunkManager(RootPath(localPath)) assert.Equal(t, localPath, testCM.RootPath()) @@ -37,7 +38,7 @@ func TestLocalCM(t *testing.T) { testLoadRoot := "test_load" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testLoadRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) prepareTests := []struct { key string @@ -51,7 +52,7 @@ func TestLocalCM(t *testing.T) { } for _, test := range prepareTests { - err := testCM.Write(ctx, path.Join(testLoadRoot, test.key), test.value) + err := testCM.Write(ctx, path.Join(localPath, testLoadRoot, test.key), test.value) require.NoError(t, err) } @@ -74,17 +75,17 @@ func TestLocalCM(t *testing.T) { for _, test := range loadTests { t.Run(test.description, func(t *testing.T) { if test.isvalid { - got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey)) + got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey)) assert.NoError(t, err) assert.Equal(t, test.expectedValue, got) } else { if test.loadKey == "/" { - got, err := testCM.Read(ctx, test.loadKey) + got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey)) assert.Error(t, err) assert.Empty(t, got) return } - got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey)) + got, err := testCM.Read(ctx, path.Join(localPath, testLoadRoot, test.loadKey)) assert.Error(t, err) assert.Empty(t, got) } @@ -105,7 +106,7 @@ func TestLocalCM(t *testing.T) { for _, test := range loadWithPrefixTests { t.Run(test.description, func(t *testing.T) { - gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(testLoadRoot, test.prefix)) + gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(localPath, testLoadRoot, test.prefix)) assert.Nil(t, err) assert.Equal(t, len(test.expectedValue), len(gotk)) assert.Equal(t, len(test.expectedValue), len(gotv)) @@ -127,7 +128,7 @@ func TestLocalCM(t *testing.T) { for _, test := range multiLoadTests { t.Run(test.description, func(t *testing.T) { for i := range test.multiKeys { - test.multiKeys[i] = path.Join(testLoadRoot, test.multiKeys[i]) + test.multiKeys[i] = path.Join(localPath, testLoadRoot, test.multiKeys[i]) } if test.isvalid { got, err := testCM.MultiRead(ctx, test.multiKeys) @@ -146,50 +147,53 @@ func TestLocalCM(t *testing.T) { testMultiSaveRoot := "test_write" testCM := NewLocalChunkManager(RootPath(localPath)) - //defer testCM.RemoveWithPrefix(testMultiSaveRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - err := testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1"), []byte("111")) + key1 := path.Join(localPath, testMultiSaveRoot, "key_1") + err := testCM.Write(ctx, key1, []byte("111")) assert.Nil(t, err) - err = testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_2"), []byte("222")) + key2 := path.Join(localPath, testMultiSaveRoot, "key_2") + err = testCM.Write(ctx, key2, []byte("222")) assert.Nil(t, err) - val, err := testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_1")) + val, err := testCM.Read(ctx, key1) assert.Nil(t, err) assert.Equal(t, []byte("111"), val) - val, err = testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_2")) + val, err = testCM.Read(ctx, key2) assert.Nil(t, err) assert.Equal(t, []byte("222"), val) - err = testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1/key_1"), []byte("111")) + // localPath/testMultiSaveRoot/key_1 is a file already exist, use its path as directory is not allowed + key3 := path.Join(localPath, testMultiSaveRoot, "key_1/key_1") + err = testCM.Write(ctx, key3, []byte("111")) assert.Error(t, err) - }) t.Run("test MultiSave", func(t *testing.T) { testMultiSaveRoot := "test_multisave" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testMultiSaveRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - err := testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1"), []byte("111")) + err := testCM.Write(ctx, path.Join(localPath, testMultiSaveRoot, "key_1"), []byte("111")) assert.Nil(t, err) kvs := map[string][]byte{ - path.Join(testMultiSaveRoot, "key_1"): []byte("123"), - path.Join(testMultiSaveRoot, "key_2"): []byte("456"), + path.Join(localPath, testMultiSaveRoot, "key_1"): []byte("123"), + path.Join(localPath, testMultiSaveRoot, "key_2"): []byte("456"), } err = testCM.MultiWrite(ctx, kvs) assert.Nil(t, err) - val, err := testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_1")) + val, err := testCM.Read(ctx, path.Join(localPath, testMultiSaveRoot, "key_1")) assert.Nil(t, err) assert.Equal(t, []byte("123"), val) kvs = map[string][]byte{ - path.Join(testMultiSaveRoot, "key_1/key_1"): []byte("123"), - path.Join(testMultiSaveRoot, "key_2/key_2"): []byte("456"), + path.Join(localPath, testMultiSaveRoot, "key_1/key_1"): []byte("123"), + path.Join(localPath, testMultiSaveRoot, "key_2/key_2"): []byte("456"), } err = testCM.MultiWrite(ctx, kvs) @@ -200,7 +204,16 @@ func TestLocalCM(t *testing.T) { testRemoveRoot := "test_remove" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testRemoveRoot) + + // empty prefix is not allowed + err := testCM.RemoveWithPrefix(ctx, "") + assert.Error(t, err) + + // prefix ".", nothing deleted + err = testCM.RemoveWithPrefix(ctx, ".") + assert.NoError(t, err) + + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) prepareTests := []struct { k string @@ -217,7 +230,7 @@ func TestLocalCM(t *testing.T) { } for _, test := range prepareTests { - k := path.Join(testRemoveRoot, test.k) + k := path.Join(localPath, testRemoveRoot, test.k) err := testCM.Write(ctx, k, test.v) require.NoError(t, err) } @@ -234,7 +247,7 @@ func TestLocalCM(t *testing.T) { for _, test := range removeTests { t.Run(test.description, func(t *testing.T) { - k := path.Join(testRemoveRoot, test.removeKey) + k := path.Join(localPath, testRemoveRoot, test.removeKey) v, err := testCM.Read(ctx, k) require.NoError(t, err) require.Equal(t, test.valueBeforeRemove, v) @@ -249,9 +262,9 @@ func TestLocalCM(t *testing.T) { } multiRemoveTest := []string{ - path.Join(testRemoveRoot, "mkey_1"), - path.Join(testRemoveRoot, "mkey_2"), - path.Join(testRemoveRoot, "mkey_3"), + path.Join(localPath, testRemoveRoot, "mkey_1"), + path.Join(localPath, testRemoveRoot, "mkey_2"), + path.Join(localPath, testRemoveRoot, "mkey_3"), } lv, err := testCM.MultiRead(ctx, multiRemoveTest) @@ -268,11 +281,11 @@ func TestLocalCM(t *testing.T) { } removeWithPrefixTest := []string{ - path.Join(testRemoveRoot, "key_prefix_1"), - path.Join(testRemoveRoot, "key_prefix_2"), - path.Join(testRemoveRoot, "key_prefix_3"), + path.Join(localPath, testRemoveRoot, "key_prefix_1"), + path.Join(localPath, testRemoveRoot, "key_prefix_2"), + path.Join(localPath, testRemoveRoot, "key_prefix_3"), } - removePrefix := path.Join(testRemoveRoot, "key_prefix") + removePrefix := path.Join(localPath, testRemoveRoot, "key_prefix") lv, err = testCM.MultiRead(ctx, removeWithPrefixTest) require.NoError(t, err) @@ -292,9 +305,9 @@ func TestLocalCM(t *testing.T) { testLoadPartialRoot := "read_at" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testLoadPartialRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - key := path.Join(testLoadPartialRoot, "TestMinIOKV_LoadPartial_key") + key := path.Join(localPath, testLoadPartialRoot, "TestMinIOKV_LoadPartial_key") value := []byte("TestMinIOKV_LoadPartial_value") err := testCM.Write(ctx, key, value) @@ -337,9 +350,9 @@ func TestLocalCM(t *testing.T) { testGetSizeRoot := "get_size" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testGetSizeRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - key := path.Join(testGetSizeRoot, "TestMinIOKV_GetSize_key") + key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetSize_key") value := []byte("TestMinIOKV_GetSize_value") err := testCM.Write(ctx, key, value) @@ -349,20 +362,44 @@ func TestLocalCM(t *testing.T) { assert.NoError(t, err) assert.Equal(t, size, int64(len(value))) - key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2") + key2 := path.Join(localPath, testGetSizeRoot, "TestMemoryKV_GetSize_key2") size, err = testCM.Size(ctx, key2) assert.Error(t, err) assert.Equal(t, int64(0), size) }) + t.Run("test read", func(t *testing.T) { + testGetSizeRoot := "get_path" + + testCM := NewLocalChunkManager(RootPath(localPath)) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) + + key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetPath_key") + value := []byte("TestMinIOKV_GetPath_value") + + reader, err := testCM.Reader(ctx, key) + assert.Nil(t, reader) + assert.Error(t, err) + + _, err = testCM.getModTime(key) + assert.Error(t, err) + + err = testCM.Write(ctx, key, value) + assert.NoError(t, err) + + reader, err = testCM.Reader(ctx, key) + assert.NoError(t, err) + assert.NotNil(t, reader) + }) + t.Run("test Path", func(t *testing.T) { testGetSizeRoot := "get_path" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testGetSizeRoot) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - key := path.Join(testGetSizeRoot, "TestMinIOKV_GetPath_key") + key := path.Join(localPath, testGetSizeRoot, "TestMinIOKV_GetPath_key") value := []byte("TestMinIOKV_GetPath_value") err := testCM.Write(ctx, key, value) @@ -370,9 +407,9 @@ func TestLocalCM(t *testing.T) { p, err := testCM.Path(ctx, key) assert.NoError(t, err) - assert.Equal(t, p, path.Join(localPath, key)) + assert.Equal(t, p, key) - key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2") + key2 := path.Join(localPath, testGetSizeRoot, "TestMemoryKV_GetSize_key2") p, err = testCM.Path(ctx, key2) assert.Error(t, err) @@ -383,82 +420,187 @@ func TestLocalCM(t *testing.T) { testPrefix := "prefix" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testPrefix) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - pathB := path.Join("a", "b") - - key := path.Join(testPrefix, pathB) + // write 2 files: + // localPath/testPrefix/a/b + // localPath/testPrefix/a/c value := []byte("a") + pathB := path.Join("a", "b") + key1 := path.Join(localPath, testPrefix, pathB) - err := testCM.Write(ctx, key, value) + err := testCM.Write(ctx, key1, value) assert.NoError(t, err) pathC := path.Join("a", "c") - key = path.Join(testPrefix, pathC) - err = testCM.Write(ctx, key, value) + key2 := path.Join(localPath, testPrefix, pathC) + + err = testCM.Write(ctx, key2, value) assert.NoError(t, err) - pathPrefix := path.Join(testPrefix, "a") - r, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true) + // recursive find localPath/testPrefix/a* + // return: + // localPath/testPrefix/a/b + // localPath/testPrefix/a/c + pathPrefix := path.Join(localPath, testPrefix, "a") + dirs, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true) assert.NoError(t, err) - assert.Equal(t, len(r), 2) - assert.Equal(t, len(m), 2) + assert.Equal(t, 2, len(dirs)) + assert.Equal(t, 2, len(m)) + assert.Contains(t, dirs, key1) + assert.Contains(t, dirs, key2) - testCM.RemoveWithPrefix(ctx, testPrefix) - r, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true) + // remove files of localPath/testPrefix + err = testCM.RemoveWithPrefix(ctx, path.Join(localPath, testPrefix)) assert.NoError(t, err) - assert.Equal(t, len(r), 0) - assert.Equal(t, len(m), 0) + + // no file returned + dirs, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true) + assert.NoError(t, err) + assert.Equal(t, 0, len(dirs)) + assert.Equal(t, 0, len(m)) }) t.Run("test ListWithPrefix", func(t *testing.T) { testPrefix := "prefix-ListWithPrefix" testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(ctx, testPrefix) + defer testCM.RemoveWithPrefix(ctx, testCM.RootPath()) - key := path.Join(testPrefix, "abc", "def") + // write 4 files: + // localPath/testPrefix/abc/def + // localPath/testPrefix/abc/deg + // localPath/testPrefix/abd + // localPath/testPrefix/bcd + key1 := path.Join(localPath, testPrefix, "abc", "def") value := []byte("a") - err := testCM.Write(ctx, key, value) + err := testCM.Write(ctx, key1, value) assert.NoError(t, err) - key = path.Join(testPrefix, "abc", "deg") - err = testCM.Write(ctx, key, value) + key2 := path.Join(localPath, testPrefix, "abc", "deg") + err = testCM.Write(ctx, key2, value) assert.NoError(t, err) - key = path.Join(testPrefix, "abd") - err = testCM.Write(ctx, key, value) + key3 := path.Join(localPath, testPrefix, "abd") + err = testCM.Write(ctx, key3, value) assert.NoError(t, err) - key = path.Join(testPrefix, "bcd") - err = testCM.Write(ctx, key, value) + key4 := path.Join(localPath, testPrefix, "bcd") + err = testCM.Write(ctx, key4, value) assert.NoError(t, err) - dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix+"/", false) + // non-recursive find localPath/testPrefix/* + // return: + // localPath/testPrefix/abc + // localPath/testPrefix/abd + // localPath/testPrefix/bcd + testPrefix1 := path.Join(localPath, testPrefix) + dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix1+"/", false) assert.Nil(t, err) - fmt.Println(dirs) assert.Equal(t, 3, len(dirs)) assert.Equal(t, 3, len(mods)) + assert.Contains(t, dirs, filepath.Dir(key1)) + assert.Contains(t, dirs, key3) + assert.Contains(t, dirs, key4) - testPrefix2 := path.Join(testPrefix, "a") + // recursive find localPath/testPrefix/* + // return: + // localPath/testPrefix/abc/def + // localPath/testPrefix/abc/deg + // localPath/testPrefix/abd + // localPath/testPrefix/bcd + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1+"/", true) + assert.Nil(t, err) + assert.Equal(t, 4, len(dirs)) + assert.Equal(t, 4, len(mods)) + assert.Contains(t, dirs, key1) + assert.Contains(t, dirs, key2) + assert.Contains(t, dirs, key3) + assert.Contains(t, dirs, key4) + + // non-recursive find localPath/testPrefix/a* + // return: + // localPath/testPrefix/abc + // localPath/testPrefix/abd + testPrefix2 := path.Join(localPath, testPrefix, "a") dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false) assert.Nil(t, err) assert.Equal(t, 2, len(dirs)) assert.Equal(t, 2, len(mods)) + assert.Contains(t, dirs, filepath.Dir(key1)) + assert.Contains(t, dirs, key3) - dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false) + // recursive find localPath/testPrefix/a* + // return: + // localPath/testPrefix/abc/def + // localPath/testPrefix/abc/deg + // localPath/testPrefix/abd + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true) assert.Nil(t, err) - assert.Equal(t, 2, len(dirs)) - assert.Equal(t, 2, len(mods)) + assert.Equal(t, 3, len(dirs)) + assert.Equal(t, 3, len(mods)) + assert.Contains(t, dirs, key1) + assert.Contains(t, dirs, key2) + assert.Contains(t, dirs, key3) - err = testCM.RemoveWithPrefix(ctx, testPrefix) + // remove files of localPath/testPrefix/a*, one file left + // localPath/testPrefix/bcd + err = testCM.RemoveWithPrefix(ctx, testPrefix2) assert.NoError(t, err) - dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix, false) - assert.NoError(t, err) - fmt.Println(dirs) - // dir still exist + // non-recursive find localPath/testPrefix + // return: + // localPath/testPrefix + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false) + assert.Nil(t, err) assert.Equal(t, 1, len(dirs)) assert.Equal(t, 1, len(mods)) + assert.Contains(t, dirs, filepath.Dir(key4)) + + // recursive find localPath/testPrefix + // return: + // localPath/testPrefix/bcd + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true) + assert.NoError(t, err) + assert.Equal(t, 1, len(dirs)) + assert.Equal(t, 1, len(mods)) + assert.Contains(t, dirs, key4) + + // non-recursive find localPath/testPrefix/a* + // return: + // localPath/testPrefix/abc + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false) + assert.Nil(t, err) + assert.Equal(t, 1, len(dirs)) + assert.Equal(t, 1, len(mods)) + assert.Contains(t, dirs, filepath.Dir(key1)) + + // recursive find localPath/testPrefix/a* + // no file returned + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true) + assert.Nil(t, err) + assert.Equal(t, 0, len(dirs)) + assert.Equal(t, 0, len(mods)) + + // remove the folder localPath/testPrefix + // the file localPath/testPrefix/bcd is removed, but the folder testPrefix still exist + err = testCM.RemoveWithPrefix(ctx, testPrefix1) + assert.NoError(t, err) + + // recursive find localPath/testPrefix + // no file returned + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true) + assert.NoError(t, err) + assert.Equal(t, 0, len(dirs)) + assert.Equal(t, 0, len(mods)) + + // recursive find localPath/testPrefix + // return + // localPath/testPrefix + dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false) + assert.NoError(t, err) + assert.Equal(t, 1, len(dirs)) + assert.Equal(t, 1, len(mods)) + assert.Contains(t, dirs, filepath.Dir(key4)) }) } diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index 1ba875848e..6b03de1d7f 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "os" + "path" "testing" "github.com/stretchr/testify/assert" @@ -139,7 +140,7 @@ func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorCh } var Params paramtable.BaseTable -var localPath = "/tmp/milvus/test_data/" +var localPath = "/tmp/milvus_test/chunkmanager/" func TestMain(m *testing.M) { Params.Init() @@ -180,7 +181,7 @@ func TestVectorChunkManager_GetPath(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, vcm) - key := "1" + key := path.Join(localPath, "1") err = vcm.Write(ctx, key, []byte{1}) assert.Nil(t, err) pathGet, err := vcm.Path(ctx, key) @@ -209,7 +210,7 @@ func TestVectorChunkManager_GetSize(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, vcm) - key := "1" + key := path.Join(localPath, "1") err = vcm.Write(ctx, key, []byte{1}) assert.Nil(t, err) sizeGet, err := vcm.Size(ctx, key) @@ -239,25 +240,26 @@ func TestVectorChunkManager_Write(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, vcm) - key := "1" - err = vcm.Write(ctx, key, []byte{1}) + key1 := path.Join(localPath, "key_1") + key2 := path.Join(localPath, "key_2") + err = vcm.Write(ctx, key1, []byte{1}) assert.Nil(t, err) - exist, err := vcm.Exist(ctx, key) + exist, err := vcm.Exist(ctx, key1) assert.True(t, exist) assert.NoError(t, err) contents := map[string][]byte{ - "key_1": {111}, - "key_2": {222}, + key1: {111}, + key2: {222}, } err = vcm.MultiWrite(ctx, contents) assert.NoError(t, err) - exist, err = vcm.Exist(ctx, "key_1") + exist, err = vcm.Exist(ctx, key1) assert.True(t, exist) assert.NoError(t, err) - exist, err = vcm.Exist(ctx, "key_2") + exist, err = vcm.Exist(ctx, key2) assert.True(t, exist) assert.NoError(t, err) @@ -278,31 +280,32 @@ func TestVectorChunkManager_Remove(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, vcm) - key := "1" - err = vcm.cacheStorage.Write(ctx, key, []byte{1}) + key1 := path.Join(localPath, "key_1") + key2 := path.Join(localPath, "key_2") + err = vcm.cacheStorage.Write(ctx, key1, []byte{1}) assert.Nil(t, err) - err = vcm.Remove(ctx, key) + err = vcm.Remove(ctx, key1) assert.Nil(t, err) - exist, err := vcm.Exist(ctx, key) + exist, err := vcm.Exist(ctx, key1) assert.False(t, exist) assert.NoError(t, err) contents := map[string][]byte{ - "key_1": {111}, - "key_2": {222}, + key1: {111}, + key2: {222}, } err = vcm.cacheStorage.MultiWrite(ctx, contents) assert.NoError(t, err) - err = vcm.MultiRemove(ctx, []string{"key_1", "key_2"}) + err = vcm.MultiRemove(ctx, []string{key1, key2}) assert.NoError(t, err) - exist, err = vcm.Exist(ctx, "key_1") + exist, err = vcm.Exist(ctx, key1) assert.False(t, exist) assert.NoError(t, err) - exist, err = vcm.Exist(ctx, "key_2") + exist, err = vcm.Exist(ctx, key2) assert.False(t, exist) assert.NoError(t, err) diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 07078ebb48..7712201556 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -24,6 +24,7 @@ import ( "errors" "math" "os" + "path" "strconv" "testing" "time" @@ -249,7 +250,7 @@ func Test_ImportWrapperRowBased(t *testing.T) { filePath := TempFilesPath + "rows_1.json" err = cm.Write(ctx, filePath, content) assert.NoError(t, err) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) rowCounter := &rowCounterTest{} assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter) @@ -313,70 +314,70 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string { ctx := context.Background() files := make([]string, 0) - filePath := "FieldBool.npy" + filePath := path.Join(cm.RootPath(), "FieldBool.npy") content, err := CreateNumpyData([]bool{true, false, true, true, true}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldInt8.npy" + filePath = path.Join(cm.RootPath(), "FieldInt8.npy") content, err = CreateNumpyData([]int8{10, 11, 12, 13, 14}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldInt16.npy" + filePath = path.Join(cm.RootPath(), "FieldInt16.npy") content, err = CreateNumpyData([]int16{100, 101, 102, 103, 104}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldInt32.npy" + filePath = path.Join(cm.RootPath(), "FieldInt32.npy") content, err = CreateNumpyData([]int32{1000, 1001, 1002, 1003, 1004}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldInt64.npy" + filePath = path.Join(cm.RootPath(), "FieldInt64.npy") content, err = CreateNumpyData([]int64{10000, 10001, 10002, 10003, 10004}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldFloat.npy" + filePath = path.Join(cm.RootPath(), "FieldFloat.npy") content, err = CreateNumpyData([]float32{3.14, 3.15, 3.16, 3.17, 3.18}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldDouble.npy" + filePath = path.Join(cm.RootPath(), "FieldDouble.npy") content, err = CreateNumpyData([]float64{5.1, 5.2, 5.3, 5.4, 5.5}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldString.npy" + filePath = path.Join(cm.RootPath(), "FieldString.npy") content, err = CreateNumpyData([]string{"a", "bb", "ccc", "dd", "e"}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldBinaryVector.npy" + filePath = path.Join(cm.RootPath(), "FieldBinaryVector.npy") content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "FieldFloatVector.npy" + filePath = path.Join(cm.RootPath(), "FieldFloatVector.npy") content, err = CreateNumpyData([][4]float32{{1, 2, 3, 4}, {3, 4, 5, 6}, {5, 6, 7, 8}, {7, 8, 9, 10}, {9, 10, 11, 12}}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) @@ -397,7 +398,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) { ctx := context.Background() cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) idAllocator := newIDAllocator(ctx, t, nil) @@ -430,7 +431,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) { assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State) // row count of fields not equal - filePath := "FieldInt8.npy" + filePath := path.Join(cm.RootPath(), "FieldInt8.npy") content, err := CreateNumpyData([]int8{10}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) @@ -493,7 +494,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) { ctx := context.Background() cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) idAllocator := newIDAllocator(ctx, t, nil) @@ -532,7 +533,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) { tr.Record("generate " + strconv.Itoa(rowCount) + " rows") // generate a json file - filePath := "row_perf.json" + filePath := path.Join(cm.RootPath(), "row_perf.json") func() { var b bytes.Buffer bw := bufio.NewWriter(&b) @@ -545,7 +546,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) { err = cm.Write(ctx, filePath, b.Bytes()) assert.NoError(t, err) }() - tr.Record("generate large json file " + filePath) + tr.Record("generate large json file: " + filePath) rowCounter := &rowCounterTest{} assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter) @@ -787,10 +788,10 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) { ] }`) - filePath := "rows_1.json" + filePath := path.Join(cm.RootPath(), "rows_1.json") err = cm.Write(ctx, filePath, content) assert.NoError(t, err) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) rowCounter := &rowCounterTest{} assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter) @@ -813,9 +814,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) { wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, importResult, reportFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) - files := make([]string, 0) - files = append(files, filePath) - + files := []string{filePath} wrapper.reportImportAttempts = 2 wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error { return errors.New("mock error") @@ -837,7 +836,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) { ctx := context.Background() cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) - defer cm.RemoveWithPrefix(ctx, "") + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) idAllocator := newIDAllocator(ctx, t, nil)