From db23a256c24fc6308d0e014eb509955ca940eae5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 8 Sep 2022 14:58:34 +0800 Subject: [PATCH] Make minioChunkManager ListObject level by level when recursive is true (#19096) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/storage/minio_chunk_manager.go | 48 +++++++++++++++++--- internal/storage/minio_chunk_manager_test.go | 18 ++++---- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 4375b42532..52afe03cff 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "container/list" "context" "errors" "fmt" @@ -319,18 +320,51 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error { return nil } +// ListWithPrefix returns objects with provided prefix. +// by default, if `recursive`=false, list object with return object with path under save level +// say minio has followinng objects: [a, ab, a/b, ab/c] +// calling `ListWithPrefix` with `prefix` = a && `recursive` = false will only returns [a, ab] +// If caller needs all objects without level limitation, `recursive` shall be true. func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { - objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: recursive}) + + // cannot use ListObjects(ctx, bucketName, Opt{Prefix:prefix, Recursive:true}) + // if minio has lots of objects under the provided path + // recursive = true may timeout during the recursive browsing the objects. + // See also: https://github.com/milvus-io/milvus/issues/19095 + var objectsKeys []string var modTimes []time.Time - for object := range objects { - if object.Err != nil { - log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err)) - return nil, nil, object.Err + tasks := list.New() + tasks.PushBack(prefix) + for tasks.Len() > 0 { + e := tasks.Front() + pre := e.Value.(string) + tasks.Remove(e) + + // TODO add concurrent call if performance matters + // only return current level per call + objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: pre, Recursive: false}) + + for object := range objects { + if object.Err != nil { + log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err)) + return nil, nil, object.Err + } + + // with tailing "/", object is a "directory" + if strings.HasSuffix(object.Key, "/") { + // enqueue when recursive is true + if recursive && object.Key != pre { + log.Warn("add next", zap.String("key", object.Key)) + tasks.PushBack(object.Key) + } + continue + } + objectsKeys = append(objectsKeys, object.Key) + modTimes = append(modTimes, object.LastModified) } - objectsKeys = append(objectsKeys, object.Key) - modTimes = append(modTimes, object.LastModified) } + return objectsKeys, modTimes, nil } diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 4427d4b62a..832824faa4 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -485,21 +485,21 @@ func TestMinIOCM(t *testing.T) { key = path.Join(testPrefix, "bc", "a", "b") err = testCM.Write(key, value) assert.NoError(t, err) - dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false) + dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", true) + assert.NoError(t, err) + assert.Equal(t, 5, len(dirs)) + assert.Equal(t, 5, len(mods)) + + dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), true) assert.NoError(t, err) assert.Equal(t, 3, len(dirs)) assert.Equal(t, 3, len(mods)) - dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false) - assert.NoError(t, err) - assert.Equal(t, 2, len(dirs)) - assert.Equal(t, 2, len(mods)) - testCM.RemoveWithPrefix(testPrefix) - r, m, err = testCM.ListWithPrefix(pathPrefix, false) + r, m, err = testCM.ListWithPrefix(pathPrefix, true) assert.NoError(t, err) - assert.Equal(t, len(r), 0) - assert.Equal(t, len(m), 0) + assert.Equal(t, 0, len(r)) + assert.Equal(t, 0, len(m)) // test wrong prefix b := make([]byte, 2048)