enhance: bump milvus-storage and use subtree fs to remove bucket name concatenation (#46866)

Related to #46617

Bump milvus-storage version from 839a8e5 to 2ab2904, which introduces
subtree filesystem support. This allows removing manual bucket name
concatenation logic across the codebase as the storage layer now handles
path prefixing internally.

Changes:
- Remove bucket name prefix logic in datanode, querynode, and storage
layers
- Simplify FileManager::GetRemoteIndexFilePrefixV2()
- Rename CColumnGroups API to CColumnSplits to align with upstream
- Update DiskFileManagerTest paths for new directory structure
- Add FFI packed reader/writer unit tests


Co-authored-by: Wei Liu <wei.liu@zilliz.com>

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
congqixia 2026-01-09 14:51:25 +08:00 committed by GitHub
parent 6b2076e00d
commit 6a1ce9eee6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 588 additions and 124 deletions

View File

@ -244,6 +244,8 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
fs->DeleteDir("/tmp/test_vector_array_for_storage_v2");
milvus_storage::ArrowFileSystemSingleton::GetInstance().Release();
}
protected:

View File

@ -757,7 +757,7 @@ JsonKeyStats::BuildWithFieldData(const std::vector<FieldDataPtr>& field_datas,
// for storage v2, we need to add bucket name to remote prefix
auto remote_prefix =
AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix());
disk_file_manager_->GetRemoteJsonStatsShreddingPrefix();
LOG_INFO(
"init parquet writer with shredding remote prefix: {} for segment {}",
remote_prefix,
@ -905,7 +905,7 @@ JsonKeyStats::LoadShreddingMeta(
}
auto remote_prefix =
AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix());
disk_file_manager_->GetRemoteJsonStatsShreddingPrefix();
// load common meta from parquet only if key_field_map_ is not already populated
// (for backward compatibility with old data that doesn't have separate meta file)
@ -938,7 +938,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
int64_t num_rows = 0;
auto remote_prefix =
AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix());
disk_file_manager_->GetRemoteJsonStatsShreddingPrefix();
std::vector<std::string> files;
for (const auto& file_id : file_ids) {

View File

@ -19,8 +19,8 @@
#include <cstring>
#include "segcore/column_groups_c.h"
TEST(CColumnGroups, TestCColumnGroups) {
CColumnGroups cgs = NewCColumnGroups();
TEST(CColumnSplits, TestCColumnSplits) {
CColumnSplits cgs = NewCColumnSplits();
int group1[] = {2, 4, 5};
int group2[] = {0, 1};
int group3[] = {3, 6, 7, 8};
@ -29,10 +29,10 @@ TEST(CColumnGroups, TestCColumnGroups) {
int group_sizes[] = {3, 2, 4};
for (int i = 0; i < 3; i++) {
AddCColumnGroup(cgs, test_groups[i], group_sizes[i]);
AddCColumnSplit(cgs, test_groups[i], group_sizes[i]);
}
ASSERT_EQ(CColumnGroupsSize(cgs), 3);
ASSERT_EQ(CColumnSplitsSize(cgs), 3);
auto vv = static_cast<std::vector<std::vector<int>>*>(cgs);
for (int i = 0; i < 3; i++) {
@ -42,5 +42,5 @@ TEST(CColumnGroups, TestCColumnGroups) {
}
}
FreeCColumnGroups(cgs);
FreeCColumnSplits(cgs);
}

View File

@ -60,9 +60,9 @@ TEST(CPackedTest, PackedWriterAndReader) {
char* paths[] = {const_cast<char*>("/tmp/0")};
int64_t part_upload_size = 0;
CColumnGroups cgs = NewCColumnGroups();
CColumnSplits cgs = NewCColumnSplits();
int group[] = {0};
AddCColumnGroup(cgs, group, 1);
AddCColumnSplit(cgs, group, 1);
auto c_status = InitLocalArrowFileSystemSingleton(path);
EXPECT_EQ(c_status.error_code, 0);
@ -102,5 +102,5 @@ TEST(CPackedTest, PackedWriterAndReader) {
c_status = CloseReader(c_packed_reader);
EXPECT_EQ(c_status.error_code, 0);
FreeCColumnGroups(cgs);
FreeCColumnSplits(cgs);
}

View File

@ -23,8 +23,8 @@ using VecVecInt = std::vector<std::vector<int>>;
extern "C" {
CColumnGroups
NewCColumnGroups() {
CColumnSplits
NewCColumnSplits() {
SCOPE_CGO_CALL_METRIC();
auto vv = std::make_unique<VecVecInt>();
@ -32,7 +32,7 @@ NewCColumnGroups() {
}
void
AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
AddCColumnSplit(CColumnSplits cgs, int* group, int group_size) {
SCOPE_CGO_CALL_METRIC();
if (!cgs || !group)
@ -44,7 +44,7 @@ AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
}
int
CColumnGroupsSize(CColumnGroups cgs) {
CColumnSplitsSize(CColumnSplits cgs) {
SCOPE_CGO_CALL_METRIC();
if (!cgs)
@ -55,7 +55,7 @@ CColumnGroupsSize(CColumnGroups cgs) {
}
void
FreeCColumnGroups(CColumnGroups cgs) {
FreeCColumnSplits(CColumnSplits cgs) {
SCOPE_CGO_CALL_METRIC();
delete static_cast<VecVecInt*>(cgs);

View File

@ -20,19 +20,18 @@
extern "C" {
#endif
typedef void* CColumnGroups;
CColumnGroups
NewCColumnGroups();
typedef void* CColumnSplits;
CColumnSplits
NewCColumnSplits();
void
AddCColumnGroup(CColumnGroups cgs, int* group, int group_size);
AddCColumnSplit(CColumnSplits cgs, int* group, int group_size);
int
CColumnGroupsSize(CColumnGroups cgs);
CColumnSplitsSize(CColumnSplits cgs);
void
FreeCColumnGroups(CColumnGroups cgs);
FreeCColumnSplits(CColumnSplits cgs);
#ifdef __cplusplus
}

View File

@ -41,7 +41,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CColumnSplits column_splits,
CStorageConfig c_storage_config,
CPackedWriter* c_packed_writer,
CPluginContext* c_plugin_context) {
@ -83,7 +83,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto columnGroups =
*static_cast<std::vector<std::vector<int>>*>(column_groups);
*static_cast<std::vector<std::vector<int>>*>(column_splits);
parquet::WriterProperties::Builder builder;
auto plugin_ptr =
@ -135,7 +135,7 @@ NewPackedWriter(struct ArrowSchema* schema,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CColumnSplits column_splits,
CPackedWriter* c_packed_writer,
CPluginContext* c_plugin_context) {
SCOPE_CGO_CALL_METRIC();
@ -157,7 +157,7 @@ NewPackedWriter(struct ArrowSchema* schema,
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto columnGroups =
*static_cast<std::vector<std::vector<int>>*>(column_groups);
*static_cast<std::vector<std::vector<int>>*>(column_splits);
parquet::WriterProperties::Builder builder;
auto plugin_ptr =

View File

@ -31,7 +31,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CColumnSplits column_splits,
CStorageConfig c_storage_config,
CPackedWriter* c_packed_writer,
CPluginContext* c_plugin_context);
@ -42,7 +42,7 @@ NewPackedWriter(struct ArrowSchema* schema,
char** paths,
int64_t num_paths,
int64_t part_upload_size,
CColumnGroups column_groups,
CColumnSplits column_splits,
CPackedWriter* c_packed_writer,
CPluginContext* c_plugin_context);

View File

@ -125,20 +125,21 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) {
TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) {
auto conf = milvus_storage::ArrowFileSystemConfig();
conf.storage_type = "local";
conf.root_path = "/tmp";
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
conf.root_path = "/tmp/diskann";
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto result = milvus_storage::CreateArrowFileSystem(conf);
EXPECT_TRUE(result.ok());
auto fs = result.ValueOrDie();
auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager();
std::string small_index_file_path =
"/tmp/diskann/index_files/1000/small_index_file";
"/tmp/diskann/index_files/1000/1/2/3/small_index_file";
std::string large_index_file_path =
"/tmp/diskann/index_files/1000/large_index_file";
"/tmp/diskann/index_files/1000/1/2/3/large_index_file";
auto exist = lcm->Exist(large_index_file_path);
std::string index_file_path = "/tmp/diskann/index_files/1000/index_file";
std::string index_file_path =
"/tmp/diskann/index_files/1000/1/2/3/index_file";
boost::filesystem::path localPath(index_file_path);
auto local_file_name = localPath.filename().string();
@ -165,7 +166,7 @@ TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) {
IndexMeta index_meta = {3, 100, 1000, 1, "index"};
auto diskAnnFileManager = std::make_shared<DiskFileManagerImpl>(
storage::FileManagerContext(filed_data_meta, index_meta, cm_, fs_));
storage::FileManagerContext(filed_data_meta, index_meta, cm_, fs));
auto os = diskAnnFileManager->OpenOutputStream(index_file_path);
size_t write_offset = 0;
@ -207,7 +208,7 @@ TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) {
EXPECT_EQ(read_small_index_size, small_index_size);
EXPECT_EQ(is->Tell(), read_offset);
std::string small_index_file_path_read =
"/tmp/diskann/index_files/1000/small_index_file_read";
"/tmp/diskann/index_files/1000/1/2/3/small_index_file_read";
lcm->CreateFile(small_index_file_path_read);
int fd_read = open(small_index_file_path_read.c_str(), O_WRONLY);
ASSERT_NE(fd_read, -1);

View File

@ -207,13 +207,7 @@ class FileManagerImpl : public milvus::FileManager {
virtual std::string
GetRemoteIndexFilePrefixV2() const {
boost::filesystem::path bucket = rcm_->GetBucketName();
std::string v1_prefix = GetRemoteIndexObjectPrefix();
if (bucket.empty()) {
return v1_prefix;
} else {
return NormalizePath(bucket / v1_prefix);
}
return GetRemoteIndexObjectPrefixV2();
}
virtual std::string

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION 839a8e5)
set( milvus-storage_VERSION 2ab2904)
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -24,6 +24,7 @@ import (
"strings"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
@ -195,20 +196,13 @@ func transformFieldBinlogs(
var totalRows int64
for _, srcFieldBinlog := range srcFieldBinlogs {
dstFieldBinlog := &datapb.FieldBinlog{
FieldID: srcFieldBinlog.GetFieldID(),
Binlogs: make([]*datapb.Binlog, 0, len(srcFieldBinlog.GetBinlogs())),
}
dstFieldBinlog := proto.Clone(srcFieldBinlog).(*datapb.FieldBinlog)
dstFieldBinlog.Binlogs = make([]*datapb.Binlog, 0, len(srcFieldBinlog.GetBinlogs()))
for _, srcBinlog := range srcFieldBinlog.GetBinlogs() {
if srcPath := srcBinlog.GetLogPath(); srcPath != "" {
dstBinlog := &datapb.Binlog{
EntriesNum: srcBinlog.GetEntriesNum(),
TimestampFrom: srcBinlog.GetTimestampFrom(),
TimestampTo: srcBinlog.GetTimestampTo(),
LogPath: mappings[srcPath],
LogSize: srcBinlog.GetLogSize(),
}
dstBinlog := proto.Clone(srcBinlog).(*datapb.Binlog)
dstBinlog.LogPath = mappings[srcPath]
dstFieldBinlog.Binlogs = append(dstFieldBinlog.Binlogs, dstBinlog)
if countRows {
@ -524,14 +518,9 @@ func buildIndexInfoFromSource(
}
}
textIndexInfos[fieldID] = &datapb.TextIndexStats{
FieldID: srcText.GetFieldID(),
Version: srcText.GetVersion(),
BuildID: srcText.GetBuildID(),
Files: targetFiles,
LogSize: srcText.GetLogSize(),
MemorySize: srcText.GetMemorySize(),
}
dstText := proto.Clone(srcText).(*datapb.TextIndexStats)
dstText.Files = targetFiles
textIndexInfos[fieldID] = dstText
}
// Process JSON Key indexes - transform file paths
@ -545,14 +534,9 @@ func buildIndexInfoFromSource(
}
}
jsonKeyIndexInfos[fieldID] = &datapb.JsonKeyStats{
FieldID: srcJson.GetFieldID(),
Version: srcJson.GetVersion(),
BuildID: srcJson.GetBuildID(),
Files: targetFiles,
JsonKeyStatsDataFormat: srcJson.GetJsonKeyStatsDataFormat(),
MemorySize: srcJson.GetMemorySize(),
}
dstJson := proto.Clone(srcJson).(*datapb.JsonKeyStats)
dstJson.Files = targetFiles
jsonKeyIndexInfos[fieldID] = dstJson
}
return indexInfos, textIndexInfos, jsonKeyIndexInfos

View File

@ -204,6 +204,7 @@ func TestTransformFieldBinlogs(t *testing.T) {
TimestampTo: 200,
LogPath: "files/insert_log/111/222/333/100/log1.log",
LogSize: 1024,
MemorySize: 1024,
},
},
},
@ -216,6 +217,7 @@ func TestTransformFieldBinlogs(t *testing.T) {
TimestampTo: 250,
LogPath: "files/insert_log/111/222/333/101/log2.log",
LogSize: 2048,
MemorySize: 2048,
},
},
},
@ -233,6 +235,7 @@ func TestTransformFieldBinlogs(t *testing.T) {
assert.Equal(t, int64(1000), result[0].Binlogs[0].EntriesNum)
assert.Equal(t, "files/insert_log/444/555/666/100/log1.log", result[0].Binlogs[0].LogPath)
assert.Equal(t, int64(1024), result[0].Binlogs[0].LogSize)
assert.Equal(t, int64(1024), result[0].Binlogs[0].MemorySize)
// Verify second field binlog
assert.Equal(t, int64(101), result[1].FieldID)

View File

@ -1,8 +1,6 @@
package util
import (
"path"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -28,9 +26,6 @@ func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, storageConfig *in
columnGroupID := insertLog.GetFieldID()
for _, binlog := range insertLog.GetBinlogs() {
filePath := metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID())
if storageConfig.StorageType != "local" {
filePath = path.Join(storageConfig.GetBucketName(), filePath)
}
filePaths = append(filePaths, filePath)
}
insertLogs = append(insertLogs, &indexcgopb.FieldInsertFiles{

View File

@ -244,17 +244,6 @@ type segmentLoader struct {
var _ Loader = (*segmentLoader)(nil)
func addBucketNameStorageV2(segmentInfo *querypb.SegmentLoadInfo) {
if segmentInfo.GetStorageVersion() == 2 && paramtable.Get().CommonCfg.StorageType.GetValue() != "local" {
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
for _, fieldBinlog := range segmentInfo.GetBinlogPaths() {
for _, binlog := range fieldBinlog.GetBinlogs() {
binlog.LogPath = path.Join(bucketName, binlog.LogPath)
}
}
}
}
func (loader *segmentLoader) Load(ctx context.Context,
collectionID int64,
segmentType SegmentType,
@ -270,9 +259,6 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("no segment to load")
return nil, nil
}
for _, segmentInfo := range segments {
addBucketNameStorageV2(segmentInfo)
}
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {

View File

@ -394,9 +394,6 @@ func (pw *PackedManifestRecordWriter) initWriters(r Record) error {
var err error
k := metautil.JoinIDPath(pw.collectionID, pw.partitionID, pw.segmentID)
basePath := path.Join(pw.storageConfig.GetRootPath(), common.SegmentInsertLogPath, k)
if pw.storageConfig.StorageType != "local" {
basePath = path.Join(pw.storageConfig.GetBucketName(), basePath)
}
pw.writer, err = NewPackedRecordManifestWriter(pw.storageConfig.GetBucketName(), basePath, -1, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups, pw.storageConfig, pw.storagePluginContext)
if err != nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error()))

View File

@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -124,8 +123,7 @@ func (pw *packedRecordWriter) Close() error {
return err
}
for id, fpath := range pw.pathsMap {
truePath := path.Join(pw.bucketName, fpath)
size, err := packed.GetFileSize(truePath, pw.storageConfig)
size, err := packed.GetFileSize(fpath, pw.storageConfig)
if err != nil {
return err
}
@ -156,20 +154,7 @@ func NewPackedRecordWriter(
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error()))
}
// if storage config is not passed, use common config
storageType := paramtable.Get().CommonCfg.StorageType.GetValue()
if storageConfig != nil {
storageType = storageConfig.GetStorageType()
}
// compose true path before create packed writer here
// and returned writtenPaths shall remain untouched
truePaths := lo.Map(paths, func(p string, _ int) string {
if storageType == "local" {
return p
}
return path.Join(bucketName, p)
})
writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext)
writer, err := packed.NewPackedWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error()))

View File

@ -21,7 +21,6 @@ import (
"encoding/base64"
"fmt"
sio "io"
"path"
"sort"
"github.com/samber/lo"
@ -299,14 +298,10 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s
binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog {
return fieldBinlog.GetBinlogs()
})
bucketName := rwOptions.storageConfig.BucketName
paths := make([][]string, len(binlogLists[0]))
for _, binlogs := range binlogLists {
for j, binlog := range binlogs {
logPath := binlog.GetLogPath()
if rwOptions.storageConfig.StorageType != "local" {
logPath = path.Join(bucketName, logPath)
}
paths[j] = append(paths[j], logPath)
}
}

View File

@ -177,9 +177,6 @@ func (r *FFIPackedReader) Schema() *arrow.Schema {
// Retain increases the reference count
func (r *FFIPackedReader) Retain() {
// if r.recordReader != nil {
// r.recordReader.Retain()
// }
}
// Release decreases the reference count

View File

@ -0,0 +1,410 @@
package packed
import (
"fmt"
"io"
"math"
"testing"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestPackedFFIReader(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
pt.Save(pt.CommonCfg.StorageType.Key, "local")
dir := t.TempDir()
t.Log("Case temp dir: ", dir)
pt.Save(pt.LocalStorageCfg.Path.Key, dir)
t.Cleanup(func() {
pt.Reset(pt.CommonCfg.StorageType.Key)
pt.Reset(pt.LocalStorageCfg.Path.Key)
})
const (
numRows = 1000
dim = 128
)
// Create schema: int64 primary key + 128-dim float vector
schema := arrow.NewSchema([]arrow.Field{
{
Name: "pk",
Type: arrow.PrimitiveTypes.Int64,
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}),
},
{
Name: "vector",
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, // float32 = 4 bytes
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}),
},
}, nil)
basePath := "files/packed_reader_test/1"
version := int64(0)
// Build record batch
b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer b.Release()
pkBuilder := b.Field(0).(*array.Int64Builder)
vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder)
// Store expected values for verification
expectedPks := make([]int64, numRows)
expectedVectors := make([][]byte, numRows)
for i := 0; i < numRows; i++ {
// Append primary key
expectedPks[i] = int64(i)
pkBuilder.Append(expectedPks[i])
// Generate random float vector and convert to bytes
vectorBytes := make([]byte, dim*4)
for j := 0; j < dim; j++ {
floatVal := rand.Float32()
bits := math.Float32bits(floatVal)
common.Endian.PutUint32(vectorBytes[j*4:], bits)
}
expectedVectors[i] = vectorBytes
vectorBuilder.Append(vectorBytes)
}
rec := b.NewRecord()
defer rec.Release()
require.Equal(t, int64(numRows), rec.NumRows())
// Define column groups
columnGroups := []storagecommon.ColumnGroup{
{Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID},
}
// Create FFI packed writer and write data
pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil)
require.NoError(t, err)
err = pw.WriteRecordBatch(rec)
require.NoError(t, err)
manifest, err := pw.Close()
require.NoError(t, err)
require.NotEmpty(t, manifest)
t.Logf("Successfully wrote %d rows with %d-dim float vectors, manifest: %s", numRows, dim, manifest)
// Create storage config for reader
storageConfig := &indexpb.StorageConfig{
RootPath: dir,
StorageType: "local",
}
// Create FFI packed reader
neededColumns := []string{"pk", "vector"}
reader, err := NewFFIPackedReader(manifest, schema, neededColumns, 8192, storageConfig, nil)
require.NoError(t, err)
require.NotNil(t, reader)
// Verify schema
assert.Equal(t, schema, reader.Schema())
// Read all records and verify data
totalRowsRead := int64(0)
for {
record, err := reader.ReadNext()
if err == io.EOF {
break
}
require.NoError(t, err)
require.NotNil(t, record)
// Verify column count
assert.Equal(t, int64(2), record.NumCols())
// Verify pk column
pkCol := record.Column(0).(*array.Int64)
for i := 0; i < pkCol.Len(); i++ {
expectedIdx := int(totalRowsRead) + i
assert.Equal(t, expectedPks[expectedIdx], pkCol.Value(i), fmt.Sprintf("pk mismatch at row %d", expectedIdx))
}
// Verify vector column
vectorCol := record.Column(1).(*array.FixedSizeBinary)
for i := 0; i < vectorCol.Len(); i++ {
expectedIdx := int(totalRowsRead) + i
assert.Equal(t, expectedVectors[expectedIdx], vectorCol.Value(i), fmt.Sprintf("vector mismatch at row %d", expectedIdx))
}
totalRowsRead += record.NumRows()
}
// Verify total rows read
assert.Equal(t, int64(numRows), totalRowsRead)
t.Logf("Successfully read %d rows", totalRowsRead)
// Close reader
err = reader.Close()
require.NoError(t, err)
}
func TestPackedFFIReaderPartialColumns(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
pt.Save(pt.CommonCfg.StorageType.Key, "local")
dir := t.TempDir()
pt.Save(pt.LocalStorageCfg.Path.Key, dir)
t.Cleanup(func() {
pt.Reset(pt.CommonCfg.StorageType.Key)
pt.Reset(pt.LocalStorageCfg.Path.Key)
})
const (
numRows = 500
dim = 64
)
// Create schema with 3 columns
schema := arrow.NewSchema([]arrow.Field{
{
Name: "pk",
Type: arrow.PrimitiveTypes.Int64,
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}),
},
{
Name: "score",
Type: arrow.PrimitiveTypes.Float64,
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}),
},
{
Name: "vector",
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4},
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"102"}),
},
}, nil)
basePath := "files/packed_reader_partial_test/1"
version := int64(0)
// Build record batch
b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer b.Release()
pkBuilder := b.Field(0).(*array.Int64Builder)
scoreBuilder := b.Field(1).(*array.Float64Builder)
vectorBuilder := b.Field(2).(*array.FixedSizeBinaryBuilder)
expectedPks := make([]int64, numRows)
expectedScores := make([]float64, numRows)
for i := 0; i < numRows; i++ {
expectedPks[i] = int64(i)
pkBuilder.Append(expectedPks[i])
expectedScores[i] = rand.Float64() * 100
scoreBuilder.Append(expectedScores[i])
vectorBytes := make([]byte, dim*4)
for j := 0; j < dim; j++ {
floatVal := rand.Float32()
bits := math.Float32bits(floatVal)
common.Endian.PutUint32(vectorBytes[j*4:], bits)
}
vectorBuilder.Append(vectorBytes)
}
rec := b.NewRecord()
defer rec.Release()
// Define column groups
columnGroups := []storagecommon.ColumnGroup{
{Columns: []int{0, 1, 2}, GroupID: storagecommon.DefaultShortColumnGroupID},
}
// Write data
pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil)
require.NoError(t, err)
err = pw.WriteRecordBatch(rec)
require.NoError(t, err)
manifest, err := pw.Close()
require.NoError(t, err)
// Create storage config
storageConfig := &indexpb.StorageConfig{
RootPath: dir,
StorageType: "local",
}
// Read only pk and score columns (skip vector)
neededColumns := []string{"pk", "score"}
partialSchema := arrow.NewSchema([]arrow.Field{
schema.Field(0),
schema.Field(1),
}, nil)
reader, err := NewFFIPackedReader(manifest, partialSchema, neededColumns, 8192, storageConfig, nil)
require.NoError(t, err)
require.NotNil(t, reader)
totalRowsRead := int64(0)
for {
record, err := reader.ReadNext()
if err == io.EOF {
break
}
require.NoError(t, err)
// Verify only 2 columns are returned
assert.Equal(t, int64(2), record.NumCols())
// Verify pk column
pkCol := record.Column(0).(*array.Int64)
for i := 0; i < pkCol.Len(); i++ {
expectedIdx := int(totalRowsRead) + i
assert.Equal(t, expectedPks[expectedIdx], pkCol.Value(i))
}
// Verify score column
scoreCol := record.Column(1).(*array.Float64)
for i := 0; i < scoreCol.Len(); i++ {
expectedIdx := int(totalRowsRead) + i
assert.Equal(t, expectedScores[expectedIdx], scoreCol.Value(i))
}
totalRowsRead += record.NumRows()
}
assert.Equal(t, int64(numRows), totalRowsRead)
t.Logf("Successfully read %d rows with partial columns", totalRowsRead)
err = reader.Close()
require.NoError(t, err)
}
func TestPackedFFIReaderMultipleBatches(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
pt.Save(pt.CommonCfg.StorageType.Key, "local")
dir := t.TempDir()
pt.Save(pt.LocalStorageCfg.Path.Key, dir)
t.Cleanup(func() {
pt.Reset(pt.CommonCfg.StorageType.Key)
pt.Reset(pt.LocalStorageCfg.Path.Key)
})
const (
numRows = 500
dim = 64
numWrites = 3
)
schema := arrow.NewSchema([]arrow.Field{
{
Name: "pk",
Type: arrow.PrimitiveTypes.Int64,
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}),
},
{
Name: "vector",
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4},
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}),
},
}, nil)
basePath := "files/packed_reader_multi_batch_test/1"
version := int64(0)
columnGroups := []storagecommon.ColumnGroup{
{Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID},
}
var manifest string
totalWrittenRows := 0
// Write multiple batches
for batch := 0; batch < numWrites; batch++ {
b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
pkBuilder := b.Field(0).(*array.Int64Builder)
vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder)
for i := 0; i < numRows; i++ {
pkBuilder.Append(int64(totalWrittenRows + i))
vectorBytes := make([]byte, dim*4)
for j := 0; j < dim; j++ {
floatVal := rand.Float32()
bits := math.Float32bits(floatVal)
common.Endian.PutUint32(vectorBytes[j*4:], bits)
}
vectorBuilder.Append(vectorBytes)
}
rec := b.NewRecord()
pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil)
require.NoError(t, err)
err = pw.WriteRecordBatch(rec)
require.NoError(t, err)
manifest, err = pw.Close()
require.NoError(t, err)
_, version, err = UnmarshalManfestPath(manifest)
require.NoError(t, err)
totalWrittenRows += numRows
b.Release()
rec.Release()
}
// Read all data
storageConfig := &indexpb.StorageConfig{
RootPath: dir,
StorageType: "local",
}
neededColumns := []string{"pk", "vector"}
reader, err := NewFFIPackedReader(manifest, schema, neededColumns, 8192, storageConfig, nil)
require.NoError(t, err)
totalRowsRead := int64(0)
for {
record, err := reader.ReadNext()
if err == io.EOF {
break
}
require.NoError(t, err)
totalRowsRead += record.NumRows()
}
assert.Equal(t, int64(totalWrittenRows), totalRowsRead)
t.Logf("Successfully read %d rows from %d batches", totalRowsRead, numWrites)
err = reader.Close()
require.NoError(t, err)
}

View File

@ -55,7 +55,7 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64,
cMultiPartUploadSize := C.int64_t(multiPartUploadSize)
cColumnGroups := C.NewCColumnGroups()
cColumnSplits := C.NewCColumnSplits()
for _, group := range columnGroups {
cGroup := C.malloc(C.size_t(len(group.Columns)) * C.size_t(unsafe.Sizeof(C.int(0))))
if cGroup == nil {
@ -65,7 +65,7 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64,
for i, val := range group.Columns {
cGroupSlice[i] = C.int(val)
}
C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group.Columns)))
C.AddCColumnSplit(cColumnSplits, (*C.int)(cGroup), C.int(len(group.Columns)))
C.free(cGroup)
}
@ -117,9 +117,9 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64,
defer C.free(unsafe.Pointer(cStorageConfig.sslCACert))
defer C.free(unsafe.Pointer(cStorageConfig.region))
defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json))
status = C.NewPackedWriterWithStorageConfig(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, cStorageConfig, &cPackedWriter, pluginContextPtr)
status = C.NewPackedWriterWithStorageConfig(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnSplits, cStorageConfig, &cPackedWriter, pluginContextPtr)
} else {
status = C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, &cPackedWriter, pluginContextPtr)
status = C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnSplits, &cPackedWriter, pluginContextPtr)
}
if err := ConsumeCStatusIntoError(&status); err != nil {
return nil, err

View File

@ -0,0 +1,116 @@
package packed
import (
"math"
"testing"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestPackedFFIWriter(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
pt.Save(pt.CommonCfg.StorageType.Key, "local")
dir := t.TempDir()
t.Log("Case temp dir: ", dir)
pt.Save(pt.LocalStorageCfg.Path.Key, dir)
t.Cleanup(func() {
pt.Reset(pt.CommonCfg.StorageType.Key)
pt.Reset(pt.LocalStorageCfg.Path.Key)
})
const (
numRows = 5000
dim = 768
batch = 10
)
// Create schema: int64 primary key + 768-dim float vector
schema := arrow.NewSchema([]arrow.Field{
{
Name: "pk",
Type: arrow.PrimitiveTypes.Int64,
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}),
},
{
Name: "vector",
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, // float32 = 4 bytes
Nullable: false,
Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}),
},
}, nil)
basePath := "files/packed_writer_test/1"
version := int64(0)
for i := 0; i < batch; i++ {
// Build record batch with 5000 rows
b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer b.Release()
pkBuilder := b.Field(0).(*array.Int64Builder)
vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder)
for i := 0; i < numRows; i++ {
// Append primary key
pkBuilder.Append(int64(i))
// Generate random float vector and convert to bytes
vectorBytes := make([]byte, dim*4)
for j := 0; j < dim; j++ {
floatVal := rand.Float32()
bits := math.Float32bits(floatVal)
common.Endian.PutUint32(vectorBytes[j*4:], bits)
}
vectorBuilder.Append(vectorBytes)
}
rec := b.NewRecord()
defer rec.Release()
require.Equal(t, int64(numRows), rec.NumRows())
// // Setup storage config for local filesystem
// storageConfig := &indexpb.StorageConfig{
// RootPath: dir,
// StorageType: "local",
// }
// Define column groups: pk and vector in the same group
columnGroups := []storagecommon.ColumnGroup{
{Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID},
}
// Create FFI packed writer
pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil)
require.NoError(t, err)
// Write record batch
err = pw.WriteRecordBatch(rec)
require.NoError(t, err)
// Close writer and get manifest
manifest, err := pw.Close()
require.NoError(t, err)
require.NotEmpty(t, manifest)
p, pv, err := UnmarshalManfestPath(manifest)
require.NoError(t, err)
assert.Equal(t, p, basePath)
assert.Equal(t, pv, version+1)
version = pv
t.Logf("Successfully wrote %d rows with %d-dim float vectors, manifest: %s", numRows, dim, manifest)
}
}