Make minioChunkManager ListObject level by level when recursive is true (#19096)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-09-08 14:58:34 +08:00 committed by GitHub
parent ebc263f440
commit db23a256c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 16 deletions

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"container/list"
"context"
"errors"
"fmt"
@ -319,18 +320,51 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error {
return nil
}
// ListWithPrefix returns objects with provided prefix.
// by default, if `recursive`=false, list object with return object with path under save level
// say minio has followinng objects: [a, ab, a/b, ab/c]
// calling `ListWithPrefix` with `prefix` = a && `recursive` = false will only returns [a, ab]
// If caller needs all objects without level limitation, `recursive` shall be true.
func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: recursive})
// cannot use ListObjects(ctx, bucketName, Opt{Prefix:prefix, Recursive:true})
// if minio has lots of objects under the provided path
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
var objectsKeys []string
var modTimes []time.Time
for object := range objects {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err))
return nil, nil, object.Err
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
// TODO add concurrent call if performance matters
// only return current level per call
objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: pre, Recursive: false})
for object := range objects {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err))
return nil, nil, object.Err
}
// with tailing "/", object is a "directory"
if strings.HasSuffix(object.Key, "/") {
// enqueue when recursive is true
if recursive && object.Key != pre {
log.Warn("add next", zap.String("key", object.Key))
tasks.PushBack(object.Key)
}
continue
}
objectsKeys = append(objectsKeys, object.Key)
modTimes = append(modTimes, object.LastModified)
}
objectsKeys = append(objectsKeys, object.Key)
modTimes = append(modTimes, object.LastModified)
}
return objectsKeys, modTimes, nil
}

View File

@ -485,21 +485,21 @@ func TestMinIOCM(t *testing.T) {
key = path.Join(testPrefix, "bc", "a", "b")
err = testCM.Write(key, value)
assert.NoError(t, err)
dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false)
dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", true)
assert.NoError(t, err)
assert.Equal(t, 5, len(dirs))
assert.Equal(t, 5, len(mods))
dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), true)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false)
assert.NoError(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
testCM.RemoveWithPrefix(testPrefix)
r, m, err = testCM.ListWithPrefix(pathPrefix, false)
r, m, err = testCM.ListWithPrefix(pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 0)
assert.Equal(t, len(m), 0)
assert.Equal(t, 0, len(r))
assert.Equal(t, 0, len(m))
// test wrong prefix
b := make([]byte, 2048)