fix: [StorageV2] index/stats task level storage v2 fs (#42191)

related: #39173

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-06-10 11:06:35 +08:00 committed by GitHub
parent fd6e2b52ff
commit 89c3afb12e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 259 additions and 36 deletions

View File

@ -239,8 +239,6 @@ CreateIndex(CIndex* res_index,
milvus::storage::FileManagerContext fileManagerContext( milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, chunk_manager); field_meta, index_meta, chunk_manager);
auto fs = milvus::storage::InitArrowFileSystem(storage_config);
auto index = auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
field_type, config, fileManagerContext); field_type, config, fileManagerContext);
@ -254,13 +252,11 @@ CreateIndex(CIndex* res_index,
auto status = CStatus(); auto status = CStatus();
status.error_code = e.get_error_code(); status.error_code = e.get_error_code();
status.error_msg = strdup(e.what()); status.error_msg = strdup(e.what());
milvus_storage::ArrowFileSystemSingleton::GetInstance().Release();
return status; return status;
} catch (std::exception& e) { } catch (std::exception& e) {
auto status = CStatus(); auto status = CStatus();
status.error_code = UnexpectedError; status.error_code = UnexpectedError;
status.error_msg = strdup(e.what()); status.error_msg = strdup(e.what());
milvus_storage::ArrowFileSystemSingleton::GetInstance().Release();
return status; return status;
} }
} }

View File

@ -25,6 +25,52 @@
#include "common/EasyAssert.h" #include "common/EasyAssert.h"
#include "common/type_c.h" #include "common/type_c.h"
CStatus
NewPackedReaderWithStorageConfig(char** paths,
int64_t num_paths,
struct ArrowSchema* schema,
const int64_t buffer_size,
CStorageConfig c_storage_config,
CPackedReader* c_packed_reader) {
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
milvus_storage::ArrowFileSystemConfig conf;
conf.address = std::string(c_storage_config.address);
conf.bucket_name = std::string(c_storage_config.bucket_name);
conf.access_key_id = std::string(c_storage_config.access_key_id);
conf.access_key_value = std::string(c_storage_config.access_key_value);
conf.root_path = std::string(c_storage_config.root_path);
conf.storage_type = std::string(c_storage_config.storage_type);
conf.cloud_provider = std::string(c_storage_config.cloud_provider);
conf.iam_endpoint = std::string(c_storage_config.iam_endpoint);
conf.log_level = std::string(c_storage_config.log_level);
conf.region = std::string(c_storage_config.region);
conf.useSSL = c_storage_config.useSSL;
conf.sslCACert = std::string(c_storage_config.sslCACert);
conf.useIAM = c_storage_config.useIAM;
conf.useVirtualHost = c_storage_config.useVirtualHost;
conf.requestTimeoutMs = c_storage_config.requestTimeoutMs;
conf.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
conf.use_custom_part_upload = c_storage_config.use_custom_part_upload;
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed,
"Failed to get filesystem");
}
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(
trueFs, truePaths, trueSchema, buffer_size);
*c_packed_reader = reader.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus CStatus
NewPackedReader(char** paths, NewPackedReader(char** paths,
int64_t num_paths, int64_t num_paths,

View File

@ -25,6 +25,14 @@ typedef void* CPackedReader;
typedef void* CArrowArray; typedef void* CArrowArray;
typedef void* CArrowSchema; typedef void* CArrowSchema;
CStatus
NewPackedReaderWithStorageConfig(char** paths,
int64_t num_paths,
struct ArrowSchema* schema,
const int64_t buffer_size,
CStorageConfig c_storage_config,
CPackedReader* c_packed_reader);
/** /**
* @brief Open a packed reader to read needed columns in the specified path. * @brief Open a packed reader to read needed columns in the specified path.
* *

View File

@ -24,6 +24,69 @@
#include "common/EasyAssert.h" #include "common/EasyAssert.h"
#include "common/type_c.h" #include "common/type_c.h"
CStatus
NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
const int64_t buffer_size,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CStorageConfig c_storage_config,
CPackedWriter* c_packed_writer) {
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
auto storage_config = milvus_storage::StorageConfig();
storage_config.part_size = part_upload_size;
milvus_storage::ArrowFileSystemConfig conf;
conf.address = std::string(c_storage_config.address);
conf.bucket_name = std::string(c_storage_config.bucket_name);
conf.access_key_id = std::string(c_storage_config.access_key_id);
conf.access_key_value = std::string(c_storage_config.access_key_value);
conf.root_path = std::string(c_storage_config.root_path);
conf.storage_type = std::string(c_storage_config.storage_type);
conf.cloud_provider = std::string(c_storage_config.cloud_provider);
conf.iam_endpoint = std::string(c_storage_config.iam_endpoint);
conf.log_level = std::string(c_storage_config.log_level);
conf.region = std::string(c_storage_config.region);
conf.useSSL = c_storage_config.useSSL;
conf.sslCACert = std::string(c_storage_config.sslCACert);
conf.useIAM = c_storage_config.useIAM;
conf.useVirtualHost = c_storage_config.useVirtualHost;
conf.requestTimeoutMs = c_storage_config.requestTimeoutMs;
conf.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
conf.use_custom_part_upload = c_storage_config.use_custom_part_upload;
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed,
"Failed to get filesystem");
}
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto columnGroups =
*static_cast<std::vector<std::vector<int>>*>(column_groups);
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs,
truePaths,
trueSchema,
storage_config,
columnGroups,
buffer_size);
AssertInfo(writer, "write must not be null");
*c_packed_writer = writer.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus CStatus
NewPackedWriter(struct ArrowSchema* schema, NewPackedWriter(struct ArrowSchema* schema,
const int64_t buffer_size, const int64_t buffer_size,

View File

@ -24,6 +24,16 @@ extern "C" {
typedef void* CPackedWriter; typedef void* CPackedWriter;
CStatus
NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
const int64_t buffer_size,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CStorageConfig c_storage_config,
CPackedWriter* c_packed_writer);
CStatus CStatus
NewPackedWriter(struct ArrowSchema* schema, NewPackedWriter(struct ArrowSchema* schema,
const int64_t buffer_size, const int64_t buffer_size,

View File

@ -20,6 +20,7 @@
#include "common/Common.h" #include "common/Common.h"
#include "common/FieldData.h" #include "common/FieldData.h"
#include "common/Types.h"
#include "log/Log.h" #include "log/Log.h"
#include "storage/Util.h" #include "storage/Util.h"
#include "storage/FileManager.h" #include "storage/FileManager.h"
@ -137,13 +138,13 @@ MemFileManagerImpl::CacheRawDataToMemory(const Config& config) {
index::GetValueFromConfig<int64_t>(config, STORAGE_VERSION_KEY) index::GetValueFromConfig<int64_t>(config, STORAGE_VERSION_KEY)
.value_or(0); .value_or(0);
if (storage_version == STORAGE_V2) { if (storage_version == STORAGE_V2) {
return cache_row_data_to_memory_storage_v2(config); return cache_raw_data_to_memory_storage_v2(config);
} }
return cache_row_data_to_memory_internal(config); return cache_raw_data_to_memory_internal(config);
} }
std::vector<FieldDataPtr> std::vector<FieldDataPtr>
MemFileManagerImpl::cache_row_data_to_memory_internal(const Config& config) { MemFileManagerImpl::cache_raw_data_to_memory_internal(const Config& config) {
auto insert_files = index::GetValueFromConfig<std::vector<std::string>>( auto insert_files = index::GetValueFromConfig<std::vector<std::string>>(
config, INSERT_FILES_KEY); config, INSERT_FILES_KEY);
AssertInfo(insert_files.has_value(), AssertInfo(insert_files.has_value(),
@ -180,7 +181,7 @@ MemFileManagerImpl::cache_row_data_to_memory_internal(const Config& config) {
} }
std::vector<FieldDataPtr> std::vector<FieldDataPtr>
MemFileManagerImpl::cache_row_data_to_memory_storage_v2(const Config& config) { MemFileManagerImpl::cache_raw_data_to_memory_storage_v2(const Config& config) {
auto data_type = index::GetValueFromConfig<DataType>(config, DATA_TYPE_KEY); auto data_type = index::GetValueFromConfig<DataType>(config, DATA_TYPE_KEY);
AssertInfo(data_type.has_value(), "data type is empty when build index"); AssertInfo(data_type.has_value(), "data type is empty when build index");
auto dim = index::GetValueFromConfig<int64_t>(config, DIM_KEY).value_or(0); auto dim = index::GetValueFromConfig<int64_t>(config, DIM_KEY).value_or(0);

View File

@ -81,10 +81,10 @@ class MemFileManagerImpl : public FileManagerImpl {
AddBinarySet(const BinarySet& binary_set, const std::string& prefix); AddBinarySet(const BinarySet& binary_set, const std::string& prefix);
std::vector<FieldDataPtr> std::vector<FieldDataPtr>
cache_row_data_to_memory_internal(const Config& config); cache_raw_data_to_memory_internal(const Config& config);
std::vector<FieldDataPtr> std::vector<FieldDataPtr>
cache_row_data_to_memory_storage_v2(const Config& config); cache_raw_data_to_memory_storage_v2(const Config& config);
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>> std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>>
cache_opt_field_memory(const Config& config); cache_opt_field_memory(const Config& config);

View File

@ -301,7 +301,8 @@ TEST(CStringIndexTest, All) {
#endif #endif
TEST(CreateIndexTest, StorageV2) { TEST(CreateIndexTest, StorageV2) {
GTEST_SKIP() << "TODO: after index/stats task level fs is finished, should fix shutdown sdk api in test"; GTEST_SKIP() << "TODO: after index/stats task level fs is finished, should "
"fix shutdown sdk api in test";
auto build_index_info = auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>(); std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();

View File

@ -163,6 +163,7 @@ func (st *statsTask) PreExecute(ctx context.Context) error {
zap.Int64("partitionID", st.req.GetPartitionID()), zap.Int64("partitionID", st.req.GetPartitionID()),
zap.Int64("segmentID", st.req.GetSegmentID()), zap.Int64("segmentID", st.req.GetSegmentID()),
zap.Int64("preExecuteRecordSpan(ms)", preExecuteRecordSpan.Milliseconds()), zap.Int64("preExecuteRecordSpan(ms)", preExecuteRecordSpan.Milliseconds()),
zap.Any("storageConfig", st.req.StorageConfig),
) )
return nil return nil
} }
@ -189,6 +190,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
return st.binlogIO.Upload(ctx, kvs) return st.binlogIO.Upload(ctx, kvs)
}), }),
storage.WithVersion(st.req.GetStorageVersion()), storage.WithVersion(st.req.GetStorageVersion()),
storage.WithStorageConfig(st.req.GetStorageConfig()),
) )
if err != nil { if err != nil {
log.Ctx(ctx).Warn("sort segment wrong, unable to init segment writer", log.Ctx(ctx).Warn("sort segment wrong, unable to init segment writer",
@ -234,6 +236,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
storage.WithVersion(st.req.StorageVersion), storage.WithVersion(st.req.StorageVersion),
storage.WithDownloader(st.binlogIO.Download), storage.WithDownloader(st.binlogIO.Download),
storage.WithBucketName(st.req.StorageConfig.BucketName), storage.WithBucketName(st.req.StorageConfig.BucketName),
storage.WithStorageConfig(st.req.GetStorageConfig()),
) )
if err != nil { if err != nil {
log.Warn("error creating insert binlog reader", zap.Error(err)) log.Warn("error creating insert binlog reader", zap.Error(err))

View File

@ -139,7 +139,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
w, err := storage.NewPackedRecordWriter(bucketName, paths, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups) w, err := storage.NewPackedRecordWriter(bucketName, paths, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -52,6 +53,7 @@ type rwOptions struct {
multiPartUploadSize int64 multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup columnGroups []storagecommon.ColumnGroup
bucketName string bucketName string
storageConfig *indexpb.StorageConfig
} }
type RwOption func(*rwOptions) type RwOption func(*rwOptions)
@ -105,6 +107,12 @@ func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption {
} }
} }
func WithStorageConfig(storageConfig *indexpb.StorageConfig) RwOption {
return func(options *rwOptions) {
options.storageConfig = storageConfig
}
}
func GuessStorageVersion(binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema) int64 { func GuessStorageVersion(binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema) int64 {
if len(binlogs) == len(schema.Fields) { if len(binlogs) == len(schema.Fields) {
return StorageV1 return StorageV1
@ -212,7 +220,7 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s
paths[j] = append(paths[j], logPath) paths[j] = append(paths[j], logPath)
} }
} }
return newPackedRecordReader(paths, schema, rwOptions.bufferSize) return newPackedRecordReader(paths, schema, rwOptions.bufferSize, rwOptions.storageConfig)
} }
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version))
} }
@ -241,6 +249,7 @@ func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segme
return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema, return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema,
blobsWriter, allocator, chunkSize, bucketName, rootPath, maxRowNum, blobsWriter, allocator, chunkSize, bucketName, rootPath, maxRowNum,
rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups, rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups,
rwOptions.storageConfig,
) )
} }
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version))

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -45,9 +46,10 @@ type packedRecordReader struct {
chunk int chunk int
reader *packed.PackedReader reader *packed.PackedReader
bufferSize int64 bufferSize int64
arrowSchema *arrow.Schema arrowSchema *arrow.Schema
field2Col map[FieldID]int field2Col map[FieldID]int
storageConfig *indexpb.StorageConfig
} }
var _ RecordReader = (*packedRecordReader)(nil) var _ RecordReader = (*packedRecordReader)(nil)
@ -63,7 +65,7 @@ func (pr *packedRecordReader) iterateNextBatch() error {
return io.EOF return io.EOF
} }
reader, err := packed.NewPackedReader(pr.paths[pr.chunk], pr.arrowSchema, pr.bufferSize) reader, err := packed.NewPackedReader(pr.paths[pr.chunk], pr.arrowSchema, pr.bufferSize, pr.storageConfig)
pr.chunk++ pr.chunk++
if err != nil { if err != nil {
return merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error()) return merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error())
@ -100,7 +102,7 @@ func (pr *packedRecordReader) Close() error {
return nil return nil
} }
func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64, func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig,
) (*packedRecordReader, error) { ) (*packedRecordReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields) arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil { if err != nil {
@ -111,17 +113,18 @@ func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema,
field2Col[field.FieldID] = i field2Col[field.FieldID] = i
} }
return &packedRecordReader{ return &packedRecordReader{
paths: paths, paths: paths,
bufferSize: bufferSize, bufferSize: bufferSize,
arrowSchema: arrowSchema, arrowSchema: arrowSchema,
field2Col: field2Col, field2Col: field2Col,
storageConfig: storageConfig,
}, nil }, nil
} }
func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema, func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema,
bufferSize int64, bufferSize int64,
) (*DeserializeReaderImpl[*Value], error) { ) (*DeserializeReaderImpl[*Value], error) {
reader, err := newPackedRecordReader(paths, schema, bufferSize) reader, err := newPackedRecordReader(paths, schema, bufferSize, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -144,6 +147,7 @@ type packedRecordWriter struct {
rowNum int64 rowNum int64
writtenUncompressed uint64 writtenUncompressed uint64
columnGroupUncompressed []uint64 columnGroupUncompressed []uint64
storageConfig *indexpb.StorageConfig
} }
func (pw *packedRecordWriter) Write(r Record) error { func (pw *packedRecordWriter) Write(r Record) error {
@ -196,7 +200,7 @@ func (pw *packedRecordWriter) Close() error {
return nil return nil
} }
func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*packedRecordWriter, error) { func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, storageConfig *indexpb.StorageConfig) (*packedRecordWriter, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields) arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil { if err != nil {
return nil, merr.WrapErrServiceInternal( return nil, merr.WrapErrServiceInternal(
@ -210,7 +214,7 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
} }
return path.Join(bucketName, p) return path.Join(bucketName, p)
}) })
writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups) writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig)
if err != nil { if err != nil {
return nil, merr.WrapErrServiceInternal( return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error())) fmt.Sprintf("can not new packed record writer %s", err.Error()))
@ -225,13 +229,14 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
paths: paths, paths: paths,
columnGroups: columnGroups, columnGroups: columnGroups,
columnGroupUncompressed: columnGroupUncompressed, columnGroupUncompressed: columnGroupUncompressed,
storageConfig: storageConfig,
}, nil }, nil
} }
func NewPackedSerializeWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64, func NewPackedSerializeWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64,
multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int,
) (*SerializeWriterImpl[*Value], error) { ) (*SerializeWriterImpl[*Value], error) {
PackedBinlogRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups) PackedBinlogRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups, nil)
if err != nil { if err != nil {
return nil, merr.WrapErrServiceInternal( return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error())) fmt.Sprintf("can not new packed record writer %s", err.Error()))
@ -259,6 +264,7 @@ type PackedBinlogRecordWriter struct {
bufferSize int64 bufferSize int64
multiPartUploadSize int64 multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup columnGroups []storagecommon.ColumnGroup
storageConfig *indexpb.StorageConfig
// writer and stats generated at runtime // writer and stats generated at runtime
writer *packedRecordWriter writer *packedRecordWriter
@ -358,7 +364,7 @@ func (pw *PackedBinlogRecordWriter) initWriters(r Record) error {
paths = append(paths, path) paths = append(paths, path)
logIdStart++ logIdStart++
} }
pw.writer, err = NewPackedRecordWriter(pw.bucketName, paths, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups) pw.writer, err = NewPackedRecordWriter(pw.bucketName, paths, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups, pw.storageConfig)
if err != nil { if err != nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error())) return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error()))
} }
@ -525,6 +531,7 @@ func (pw *PackedBinlogRecordWriter) GetBufferUncompressed() uint64 {
func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema, func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema,
blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, bucketName, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, bucketName, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup,
storageConfig *indexpb.StorageConfig,
) (*PackedBinlogRecordWriter, error) { ) (*PackedBinlogRecordWriter, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields) arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil { if err != nil {
@ -567,5 +574,6 @@ func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID,
columnGroups: columnGroups, columnGroups: columnGroups,
pkstats: stats, pkstats: stats,
bm25Stats: bm25Stats, bm25Stats: bm25Stats,
storageConfig: storageConfig,
}, nil }, nil
} }

View File

@ -31,9 +31,11 @@ import (
"github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata" "github.com/apache/arrow/go/v17/arrow/cdata"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
) )
func NewPackedReader(filePaths []string, schema *arrow.Schema, bufferSize int64) (*PackedReader, error) { func NewPackedReader(filePaths []string, schema *arrow.Schema, bufferSize int64, storageConfig *indexpb.StorageConfig) (*PackedReader, error) {
cFilePaths := make([]*C.char, len(filePaths)) cFilePaths := make([]*C.char, len(filePaths))
for i, path := range filePaths { for i, path := range filePaths {
cFilePaths[i] = C.CString(path) cFilePaths[i] = C.CString(path)
@ -50,7 +52,45 @@ func NewPackedReader(filePaths []string, schema *arrow.Schema, bufferSize int64)
cBufferSize := C.int64_t(bufferSize) cBufferSize := C.int64_t(bufferSize)
var cPackedReader C.CPackedReader var cPackedReader C.CPackedReader
status := C.NewPackedReader(cFilePathsArray, cNumPaths, cSchema, cBufferSize, &cPackedReader) var status C.CStatus
if storageConfig != nil {
cStorageConfig := C.CStorageConfig{
address: C.CString(storageConfig.GetAddress()),
bucket_name: C.CString(storageConfig.GetBucketName()),
access_key_id: C.CString(storageConfig.GetAccessKeyID()),
access_key_value: C.CString(storageConfig.GetSecretAccessKey()),
root_path: C.CString(storageConfig.GetRootPath()),
storage_type: C.CString(storageConfig.GetStorageType()),
cloud_provider: C.CString(storageConfig.GetCloudProvider()),
iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()),
log_level: C.CString("warn"),
useSSL: C.bool(storageConfig.GetUseSSL()),
sslCACert: C.CString(storageConfig.GetSslCACert()),
useIAM: C.bool(storageConfig.GetUseIAM()),
region: C.CString(storageConfig.GetRegion()),
useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()),
requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()),
gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()),
use_custom_part_upload: true,
}
defer C.free(unsafe.Pointer(cStorageConfig.address))
defer C.free(unsafe.Pointer(cStorageConfig.bucket_name))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_id))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_value))
defer C.free(unsafe.Pointer(cStorageConfig.root_path))
defer C.free(unsafe.Pointer(cStorageConfig.storage_type))
defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider))
defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint))
defer C.free(unsafe.Pointer(cStorageConfig.log_level))
defer C.free(unsafe.Pointer(cStorageConfig.sslCACert))
defer C.free(unsafe.Pointer(cStorageConfig.region))
defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json))
status = C.NewPackedReaderWithStorageConfig(cFilePathsArray, cNumPaths, cSchema, cBufferSize, cStorageConfig, &cPackedReader)
} else {
status = C.NewPackedReader(cFilePathsArray, cNumPaths, cSchema, cBufferSize, &cPackedReader)
}
if err := ConsumeCStatusIntoError(&status); err != nil { if err := ConsumeCStatusIntoError(&status); err != nil {
return nil, err return nil, err
} }

View File

@ -82,7 +82,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}} columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}}
bufferSize := int64(10 * 1024 * 1024) // 10MB bufferSize := int64(10 * 1024 * 1024) // 10MB
multiPartUploadSize := int64(0) multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil)
suite.NoError(err) suite.NoError(err)
for i := 0; i < batches; i++ { for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(suite.rec) err = pw.WriteRecordBatch(suite.rec)
@ -91,7 +91,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
err = pw.Close() err = pw.Close()
suite.NoError(err) suite.NoError(err)
reader, err := NewPackedReader(paths, suite.schema, bufferSize) reader, err := NewPackedReader(paths, suite.schema, bufferSize, nil)
suite.NoError(err) suite.NoError(err)
rr, err := reader.ReadNext() rr, err := reader.ReadNext()
suite.NoError(err) suite.NoError(err)
@ -134,7 +134,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}} columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}}
bufferSize := int64(10 * 1024 * 1024) // 10MB bufferSize := int64(10 * 1024 * 1024) // 10MB
multiPartUploadSize := int64(0) multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil)
suite.NoError(err) suite.NoError(err)
for i := 0; i < batches; i++ { for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(rec) err = pw.WriteRecordBatch(rec)
@ -143,7 +143,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
err = pw.Close() err = pw.Close()
suite.NoError(err) suite.NoError(err)
reader, err := NewPackedReader(paths, suite.schema, bufferSize) reader, err := NewPackedReader(paths, suite.schema, bufferSize, nil)
suite.NoError(err) suite.NoError(err)
var rows int64 = 0 var rows int64 = 0
var rr arrow.Record var rr arrow.Record

View File

@ -33,9 +33,10 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
) )
func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*PackedWriter, error) { func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, storageConfig *indexpb.StorageConfig) (*PackedWriter, error) {
cFilePaths := make([]*C.char, len(filePaths)) cFilePaths := make([]*C.char, len(filePaths))
for i, path := range filePaths { for i, path := range filePaths {
cFilePaths[i] = C.CString(path) cFilePaths[i] = C.CString(path)
@ -68,7 +69,44 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64,
} }
var cPackedWriter C.CPackedWriter var cPackedWriter C.CPackedWriter
status := C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, &cPackedWriter) var status C.CStatus
if storageConfig != nil {
cStorageConfig := C.CStorageConfig{
address: C.CString(storageConfig.GetAddress()),
bucket_name: C.CString(storageConfig.GetBucketName()),
access_key_id: C.CString(storageConfig.GetAccessKeyID()),
access_key_value: C.CString(storageConfig.GetSecretAccessKey()),
root_path: C.CString(storageConfig.GetRootPath()),
storage_type: C.CString(storageConfig.GetStorageType()),
cloud_provider: C.CString(storageConfig.GetCloudProvider()),
iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()),
log_level: C.CString("warn"),
useSSL: C.bool(storageConfig.GetUseSSL()),
sslCACert: C.CString(storageConfig.GetSslCACert()),
useIAM: C.bool(storageConfig.GetUseIAM()),
region: C.CString(storageConfig.GetRegion()),
useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()),
requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()),
gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()),
use_custom_part_upload: true,
}
defer C.free(unsafe.Pointer(cStorageConfig.address))
defer C.free(unsafe.Pointer(cStorageConfig.bucket_name))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_id))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_value))
defer C.free(unsafe.Pointer(cStorageConfig.root_path))
defer C.free(unsafe.Pointer(cStorageConfig.storage_type))
defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider))
defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint))
defer C.free(unsafe.Pointer(cStorageConfig.log_level))
defer C.free(unsafe.Pointer(cStorageConfig.sslCACert))
defer C.free(unsafe.Pointer(cStorageConfig.region))
defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json))
status = C.NewPackedWriterWithStorageConfig(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, cStorageConfig, &cPackedWriter)
} else {
status = C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, &cPackedWriter)
}
if err := ConsumeCStatusIntoError(&status); err != nil { if err := ConsumeCStatusIntoError(&status); err != nil {
return nil, err return nil, err
} }

View File

@ -187,7 +187,7 @@ func InitRemoteArrowFileSystem(params *paramtable.ComponentParam) error {
} }
status := C.InitRemoteArrowFileSystemSingleton(storageConfig) status := C.InitRemoteArrowFileSystemSingleton(storageConfig)
return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed") return HandleCStatus(&status, "InitRemoteArrowFileSystemSingleton failed")
} }
func InitRemoteChunkManager(params *paramtable.ComponentParam) error { func InitRemoteChunkManager(params *paramtable.ComponentParam) error {