diff --git a/.github/workflows/mac.yaml b/.github/workflows/mac.yaml index 0d9e1e1013..5fa6fb368b 100644 --- a/.github/workflows/mac.yaml +++ b/.github/workflows/mac.yaml @@ -47,7 +47,7 @@ jobs: - name: Setup Go environment uses: actions/setup-go@v2.2.0 with: - go-version: '1.20' + go-version: '~1.20.7' - name: Mac Cache Go Mod Volumes uses: actions/cache@v3 with: diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index c44e685db6..0ce0bda398 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -2007,8 +2007,10 @@ func TestSearchTask_Requery(t *testing.T) { err := qt.Requery() assert.NoError(t, err) assert.Len(t, qt.result.Results.FieldsData, 2) - assert.Equal(t, pkField, qt.result.Results.FieldsData[0].GetFieldName()) - assert.Equal(t, vecField, qt.result.Results.FieldsData[1].GetFieldName()) + for _, field := range qt.result.Results.FieldsData { + fieldName := field.GetFieldName() + assert.Contains(t, []string{pkField, vecField}, fieldName) + } }) t.Run("Test no primary key", func(t *testing.T) { diff --git a/internal/storage/azure_object_storage.go b/internal/storage/azure_object_storage.go index 6fa147c95d..286733229c 100644 --- a/internal/storage/azure_object_storage.go +++ b/internal/storage/azure_object_storage.go @@ -117,21 +117,43 @@ func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bu return *info.ContentLength, nil } -func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) { - pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ - Prefix: &prefix, - }) - objects := map[string]time.Time{} - if pager.More() { - pageResp, err := pager.NextPage(context.Background()) - if err != nil { - return nil, checkObjectStorageError(prefix, err) +func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) { + var objectsKeys []string + var modTimes []time.Time + if recursive { + pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ + Prefix: &prefix, + }) + if pager.More() { + pageResp, err := pager.NextPage(context.Background()) + if err != nil { + return []string{}, []time.Time{}, checkObjectStorageError(prefix, err) + } + for _, blob := range pageResp.Segment.BlobItems { + objectsKeys = append(objectsKeys, *blob.Name) + modTimes = append(modTimes, *blob.Properties.LastModified) + } } - for _, blob := range pageResp.Segment.BlobItems { - objects[*blob.Name] = *blob.Properties.LastModified + } else { + pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ + Prefix: &prefix, + }) + if pager.More() { + pageResp, err := pager.NextPage(context.Background()) + if err != nil { + return []string{}, []time.Time{}, checkObjectStorageError(prefix, err) + } + for _, blob := range pageResp.Segment.BlobItems { + objectsKeys = append(objectsKeys, *blob.Name) + modTimes = append(modTimes, *blob.Properties.LastModified) + } + for _, blob := range pageResp.Segment.BlobPrefixes { + objectsKeys = append(objectsKeys, *blob.Name) + modTimes = append(modTimes, time.Now()) + } } } - return objects, nil + return objectsKeys, modTimes, nil } func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error { diff --git a/internal/storage/azure_object_storage_test.go b/internal/storage/azure_object_storage_test.go index 4af7eff35c..05483f5f97 100644 --- a/internal/storage/azure_object_storage_test.go +++ b/internal/storage/azure_object_storage_test.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "context" + "fmt" "io" "os" "testing" @@ -127,10 +128,10 @@ func TestAzureObjectStorage(t *testing.T) { for _, test := range loadWithPrefixTests { t.Run(test.description, func(t *testing.T) { - gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false) + gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false) assert.NoError(t, err) assert.Equal(t, len(test.expectedValue), len(gotk)) - for key := range gotk { + for _, key := range gotk { err := testCM.RemoveObject(ctx, config.bucketName, key) assert.NoError(t, err) } @@ -138,6 +139,58 @@ func TestAzureObjectStorage(t *testing.T) { } }) + t.Run("test list", func(t *testing.T) { + testCM, err := newAzureObjectStorageWithConfig(ctx, &config) + assert.Equal(t, err, nil) + defer testCM.DeleteContainer(ctx, config.bucketName, &azblob.DeleteContainerOptions{}) + + prepareTests := []struct { + valid bool + key string + value []byte + }{ + {false, "abc/", []byte("123")}, + {true, "abc/d", []byte("1234")}, + {false, "abc/d/e", []byte("12345")}, + {true, "abc/e/d", []byte("12354")}, + {true, "key_/1/1", []byte("111")}, + {true, "key_/1/2", []byte("222")}, + {false, "key_/1/2/3", []byte("333")}, + {true, "key_/2/3", []byte("333")}, + } + + for _, test := range prepareTests { + err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value))) + require.Nil(t, err) + if !test.valid { + err := testCM.RemoveObject(ctx, config.bucketName, test.key) + require.Nil(t, err) + } + } + + insertWithPrefixTests := []struct { + recursive bool + prefix string + expectedValue []string + }{ + {true, "abc/", []string{"abc/d", "abc/e/d"}}, + {true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}}, + {false, "abc/", []string{"abc/d", "abc/e/"}}, + {false, "key_/", []string{"key_/1/", "key_/2/"}}, + } + + for _, test := range insertWithPrefixTests { + t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) { + gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive) + assert.NoError(t, err) + assert.Equal(t, len(test.expectedValue), len(gotk)) + for _, key := range gotk { + assert.Contains(t, test.expectedValue, key) + } + }) + } + }) + t.Run("test useIAM", func(t *testing.T) { var err error config.useIAM = true diff --git a/internal/storage/minio_object_storage.go b/internal/storage/minio_object_storage.go index 8e208d84b6..639d7bbce0 100644 --- a/internal/storage/minio_object_storage.go +++ b/internal/storage/minio_object_storage.go @@ -17,6 +17,7 @@ package storage import ( + "container/list" "context" "fmt" "io" @@ -170,20 +171,43 @@ func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bu return info.Size, checkObjectStorageError(objectName, err) } -func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) { - res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ - Prefix: prefix, - Recursive: recursive, - }) +func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) { + var objectsKeys []string + var modTimes []time.Time + tasks := list.New() + tasks.PushBack(prefix) + for tasks.Len() > 0 { + e := tasks.Front() + pre := e.Value.(string) + tasks.Remove(e) - objects := map[string]time.Time{} - for object := range res { - if !recursive && object.Err != nil { - return map[string]time.Time{}, object.Err + res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Prefix: pre, + Recursive: false, + }) + + objects := map[string]time.Time{} + for object := range res { + if object.Err != nil { + log.Warn("failed to list with prefix", zap.String("bucket", bucketName), zap.String("prefix", prefix), zap.Error(object.Err)) + return []string{}, []time.Time{}, object.Err + } + objects[object.Key] = object.LastModified + } + for object, lastModified := range objects { + // with tailing "/", object is a "directory" + if strings.HasSuffix(object, "/") && recursive { + // enqueue when recursive is true + if object != pre { + tasks.PushBack(object) + } + continue + } + objectsKeys = append(objectsKeys, object) + modTimes = append(modTimes, lastModified) } - objects[object.Key] = object.LastModified } - return objects, nil + return objectsKeys, modTimes, nil } func (minioObjectStorage *MinioObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error { diff --git a/internal/storage/minio_object_storage_test.go b/internal/storage/minio_object_storage_test.go index 5e7968ddc3..75209ff3d9 100644 --- a/internal/storage/minio_object_storage_test.go +++ b/internal/storage/minio_object_storage_test.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "context" + "fmt" "io" "testing" @@ -131,10 +132,10 @@ func TestMinioObjectStorage(t *testing.T) { for _, test := range loadWithPrefixTests { t.Run(test.description, func(t *testing.T) { - gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false) + gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false) assert.NoError(t, err) assert.Equal(t, len(test.expectedValue), len(gotk)) - for key := range gotk { + for _, key := range gotk { err := testCM.RemoveObject(ctx, config.bucketName, key) assert.NoError(t, err) } @@ -142,6 +143,54 @@ func TestMinioObjectStorage(t *testing.T) { } }) + t.Run("test list", func(t *testing.T) { + testCM, err := newMinioObjectStorageWithConfig(ctx, &config) + assert.Equal(t, err, nil) + defer testCM.RemoveBucket(ctx, config.bucketName) + + prepareTests := []struct { + valid bool + key string + value []byte + }{ + {false, "abc/", []byte("123")}, + {true, "abc/d", []byte("1234")}, + {false, "abc/d/e", []byte("12345")}, + {true, "abc/e/d", []byte("12354")}, + {true, "key_/1/1", []byte("111")}, + {true, "key_/1/2", []byte("222")}, + {false, "key_/1/2/3", []byte("333")}, + {true, "key_/2/3", []byte("333")}, + } + + for _, test := range prepareTests { + err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value))) + require.Equal(t, test.valid, err == nil) + } + + insertWithPrefixTests := []struct { + recursive bool + prefix string + expectedValue []string + }{ + {true, "abc/", []string{"abc/d", "abc/e/d"}}, + {true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}}, + {false, "abc/", []string{"abc/d", "abc/e/"}}, + {false, "key_/", []string{"key_/1/", "key_/2/"}}, + } + + for _, test := range insertWithPrefixTests { + t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) { + gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive) + assert.NoError(t, err) + assert.Equal(t, len(test.expectedValue), len(gotk)) + for _, key := range gotk { + assert.Contains(t, test.expectedValue, key) + } + }) + } + }) + t.Run("test useIAM", func(t *testing.T) { var err error config.useIAM = true diff --git a/internal/storage/remote_chunk_manager.go b/internal/storage/remote_chunk_manager.go index e224f124ce..bbdcf6a3d0 100644 --- a/internal/storage/remote_chunk_manager.go +++ b/internal/storage/remote_chunk_manager.go @@ -18,7 +18,6 @@ package storage import ( "bytes" - "container/list" "context" "io" "strings" @@ -50,7 +49,7 @@ type ObjectStorage interface { GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error StatObject(ctx context.Context, bucketName, objectName string) (int64, error) - ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) + ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) RemoveObject(ctx context.Context, bucketName, objectName string) error } @@ -270,14 +269,10 @@ func (mcm *RemoteChunkManager) MultiRemove(ctx context.Context, keys []string) e // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. func (mcm *RemoteChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { - objects, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true) + removeKeys, _, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true) if err != nil { return err } - removeKeys := make([]string, 0) - for key := range objects { - removeKeys = append(removeKeys, key) - } i := 0 maxGoroutine := 10 for i < len(removeKeys) { @@ -312,38 +307,9 @@ func (mcm *RemoteChunkManager) ListWithPrefix(ctx context.Context, prefix string // recursive = true may timeout during the recursive browsing the objects. // See also: https://github.com/milvus-io/milvus/issues/19095 - var objectsKeys []string - var modTimes []time.Time - - tasks := list.New() - tasks.PushBack(prefix) - for tasks.Len() > 0 { - e := tasks.Front() - pre := e.Value.(string) - tasks.Remove(e) - - // TODO add concurrent call if performance matters - // only return current level per call - objects, err := mcm.listObjects(ctx, mcm.bucketName, pre, false) - if err != nil { - return nil, nil, err - } - - for object, lastModified := range objects { - // with tailing "/", object is a "directory" - if strings.HasSuffix(object, "/") && recursive { - // enqueue when recursive is true - if object != pre { - tasks.PushBack(object) - } - continue - } - objectsKeys = append(objectsKeys, object) - modTimes = append(modTimes, lastModified) - } - } - - return objectsKeys, modTimes, nil + // TODO add concurrent call if performance matters + // only return current level per call + return mcm.listObjects(ctx, mcm.bucketName, prefix, recursive) } func (mcm *RemoteChunkManager) getObject(ctx context.Context, bucketName, objectName string, @@ -396,10 +362,10 @@ func (mcm *RemoteChunkManager) getObjectSize(ctx context.Context, bucketName, ob return info, err } -func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) { +func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) { start := timerecord.NewTimeRecorder("listObjects") - res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive) + blobNames, lastModifiedTime, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive) metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc() if err == nil { metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel). @@ -409,7 +375,7 @@ func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName strin log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(err)) metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.FailLabel).Inc() } - return res, err + return blobNames, lastModifiedTime, err } func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {