enhance: [StorageV2] Use compressed size as log file size (#44402)

Related to #39173

backlog issue that memory size and log size shared same value. This
patch add `GetFileSize` api to get remote compressed binlog size as meta
log file size to calculate usage more accurate.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-09-16 21:20:02 +08:00 committed by GitHub
parent 691a8df953
commit 6f7318a731
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 178 additions and 11 deletions

View File

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "common/common_type_c.h"
#include "parquet/encryption/encryption.h"
#include "parquet/properties.h"
#include "parquet/types.h"
@ -22,6 +23,7 @@
#include "milvus-storage/filesystem/fs.h"
#include "storage/PluginLoader.h"
#include "storage/KeyRetriever.h"
#include "storage/Util.h"
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
@ -262,3 +264,80 @@ CloseWriter(CPackedWriter c_packed_writer) {
return milvus::FailureCStatus(&e);
}
}
CStatus
GetFileSize(const char* path, int64_t* size) {
SCOPE_CGO_CALL_METRIC();
try {
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileReadFailed,
"[StorageV2] Failed to get filesystem");
}
auto result = trueFs->GetFileInfo(path);
if (!result.ok()) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileReadFailed,
"[StorageV2] Failed to get file info");
}
*size = result.ValueOrDie().size();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus
GetFileSizeWithStorageConfig(const char* path,
int64_t* size,
CStorageConfig c_storage_config) {
SCOPE_CGO_CALL_METRIC();
try {
milvus_storage::ArrowFileSystemConfig conf;
conf.address = std::string(c_storage_config.address);
conf.bucket_name = std::string(c_storage_config.bucket_name);
conf.access_key_id = std::string(c_storage_config.access_key_id);
conf.access_key_value = std::string(c_storage_config.access_key_value);
conf.root_path = std::string(c_storage_config.root_path);
conf.storage_type = std::string(c_storage_config.storage_type);
conf.cloud_provider = std::string(c_storage_config.cloud_provider);
conf.iam_endpoint = std::string(c_storage_config.iam_endpoint);
conf.log_level = std::string(c_storage_config.log_level);
conf.region = std::string(c_storage_config.region);
conf.useSSL = c_storage_config.useSSL;
conf.sslCACert = std::string(c_storage_config.sslCACert);
conf.useIAM = c_storage_config.useIAM;
conf.useVirtualHost = c_storage_config.useVirtualHost;
conf.requestTimeoutMs = c_storage_config.requestTimeoutMs;
conf.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
conf.use_custom_part_upload = c_storage_config.use_custom_part_upload;
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileReadFailed,
"[StorageV2] Failed to get filesystem");
}
auto result = trueFs->GetFileInfo(path);
if (!result.ok()) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileReadFailed,
"[StorageV2] Failed to get file info");
}
*size = result.ValueOrDie().size();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

View File

@ -14,6 +14,7 @@
#pragma once
#include "common/common_type_c.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -54,6 +55,14 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
CStatus
CloseWriter(CPackedWriter c_packed_writer);
CStatus
GetFileSize(const char* path, int64_t* size);
CStatus
GetFileSizeWithStorageConfig(const char* path,
int64_t* size,
CStorageConfig c_storage_config);
#ifdef __cplusplus
}
#endif

View File

@ -181,6 +181,10 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
if err = w.Write(rec); err != nil {
return nil, err
}
// close first to get compressed size
if err = w.Close(); err != nil {
return nil, err
}
for _, columnGroup := range columnGroups {
columnGroupID := columnGroup.GroupID
logs[columnGroupID] = &datapb.FieldBinlog{
@ -188,7 +192,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
ChildFields: columnGroup.Fields,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)),
LogSize: int64(w.GetColumnGroupWrittenCompressed(columnGroup.GroupID)),
MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)),
LogPath: w.GetWrittenPaths(columnGroupID),
EntriesNum: w.GetWrittenRowNum(),
@ -198,9 +202,6 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
},
}
}
if err = w.Close(); err != nil {
return nil, err
}
return logs, nil
}

View File

@ -156,6 +156,7 @@ type packedRecordWriter struct {
rowNum int64
writtenUncompressed uint64
columnGroupUncompressed map[typeutil.UniqueID]uint64
columnGroupCompressed map[typeutil.UniqueID]uint64
storageConfig *indexpb.StorageConfig
}
@ -199,6 +200,13 @@ func (pw *packedRecordWriter) GetColumnGroupWrittenUncompressed(columnGroup type
return 0
}
func (pw *packedRecordWriter) GetColumnGroupWrittenCompressed(columnGroup typeutil.UniqueID) uint64 {
if size, ok := pw.columnGroupCompressed[columnGroup]; ok {
return size
}
return 0
}
func (pw *packedRecordWriter) GetWrittenPaths(columnGroup typeutil.UniqueID) string {
if path, ok := pw.pathsMap[columnGroup]; ok {
return path
@ -212,7 +220,18 @@ func (pw *packedRecordWriter) GetWrittenRowNum() int64 {
func (pw *packedRecordWriter) Close() error {
if pw.writer != nil {
return pw.writer.Close()
err := pw.writer.Close()
if err != nil {
return err
}
for id, fpath := range pw.pathsMap {
truePath := path.Join(pw.bucketName, fpath)
size, err := packed.GetFileSize(truePath, pw.storageConfig)
if err != nil {
return err
}
pw.columnGroupCompressed[id] = uint64(size)
}
}
return nil
}
@ -242,6 +261,7 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
columnGroupUncompressed := make(map[typeutil.UniqueID]uint64)
columnGroupCompressed := make(map[typeutil.UniqueID]uint64)
pathsMap := make(map[typeutil.UniqueID]string)
if len(paths) != len(columnGroups) {
return nil, merr.WrapErrParameterInvalid(len(paths), len(columnGroups),
@ -249,6 +269,7 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
}
for i, columnGroup := range columnGroups {
columnGroupUncompressed[columnGroup.GroupID] = 0
columnGroupCompressed[columnGroup.GroupID] = 0
pathsMap[columnGroup.GroupID] = paths[i]
}
return &packedRecordWriter{
@ -260,6 +281,7 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
pathsMap: pathsMap,
columnGroups: columnGroups,
columnGroupUncompressed: columnGroupUncompressed,
columnGroupCompressed: columnGroupCompressed,
storageConfig: storageConfig,
}, nil
}
@ -402,6 +424,11 @@ func (pw *PackedBinlogRecordWriter) GetWrittenUncompressed() uint64 {
}
func (pw *PackedBinlogRecordWriter) Close() error {
if pw.writer != nil {
if err := pw.writer.Close(); err != nil {
return err
}
}
pw.finalizeBinlogs()
if err := pw.writeStats(); err != nil {
return err
@ -409,11 +436,6 @@ func (pw *PackedBinlogRecordWriter) Close() error {
if err := pw.writeBm25Stats(); err != nil {
return err
}
if pw.writer != nil {
if err := pw.writer.Close(); err != nil {
return err
}
}
return nil
}
@ -434,7 +456,7 @@ func (pw *PackedBinlogRecordWriter) finalizeBinlogs() {
}
}
pw.fieldBinlogs[columnGroupID].Binlogs = append(pw.fieldBinlogs[columnGroupID].Binlogs, &datapb.Binlog{
LogSize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)),
LogSize: int64(pw.writer.GetColumnGroupWrittenCompressed(columnGroupID)),
MemorySize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)),
LogPath: pw.writer.GetWrittenPaths(columnGroupID),
EntriesNum: pw.writer.GetWrittenRowNum(),

View File

@ -22,6 +22,7 @@ package packed
#include "common/type_c.h"
#include "common/protobuf_utils_c.h"
#include "segcore/segment_c.h"
#include "segcore/packed_writer_c.h"
#include "storage/storage_c.h"
*/
import "C"
@ -29,6 +30,7 @@ import "C"
import (
"unsafe"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
@ -41,3 +43,57 @@ func ConsumeCStatusIntoError(status *C.CStatus) error {
C.free(unsafe.Pointer(status.error_msg))
return merr.SegcoreError(int32(errorCode), errorMsg)
}
func GetFileSize(path string, storageConfig *indexpb.StorageConfig) (int64, error) {
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
var fileSize int64
if storageConfig == nil {
status := C.GetFileSize(cPath, (*C.int64_t)(unsafe.Pointer(&fileSize)))
return fileSize, ConsumeCStatusIntoError(&status)
} else {
cStorageConfig := GetCStorageConfig(storageConfig)
defer DeleteCStorageConfig(cStorageConfig)
status := C.GetFileSizeWithStorageConfig(cPath, (*C.int64_t)(unsafe.Pointer(&fileSize)), cStorageConfig)
return fileSize, ConsumeCStatusIntoError(&status)
}
}
func GetCStorageConfig(storageConfig *indexpb.StorageConfig) C.CStorageConfig {
cStorageConfig := C.CStorageConfig{
address: C.CString(storageConfig.GetAddress()),
bucket_name: C.CString(storageConfig.GetBucketName()),
access_key_id: C.CString(storageConfig.GetAccessKeyID()),
access_key_value: C.CString(storageConfig.GetSecretAccessKey()),
root_path: C.CString(storageConfig.GetRootPath()),
storage_type: C.CString(storageConfig.GetStorageType()),
cloud_provider: C.CString(storageConfig.GetCloudProvider()),
iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()),
log_level: C.CString("warn"),
useSSL: C.bool(storageConfig.GetUseSSL()),
sslCACert: C.CString(storageConfig.GetSslCACert()),
useIAM: C.bool(storageConfig.GetUseIAM()),
region: C.CString(storageConfig.GetRegion()),
useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()),
requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()),
gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()),
use_custom_part_upload: true,
}
return cStorageConfig
}
func DeleteCStorageConfig(cStorageConfig C.CStorageConfig) {
C.free(unsafe.Pointer(cStorageConfig.address))
C.free(unsafe.Pointer(cStorageConfig.bucket_name))
C.free(unsafe.Pointer(cStorageConfig.access_key_id))
C.free(unsafe.Pointer(cStorageConfig.access_key_value))
C.free(unsafe.Pointer(cStorageConfig.root_path))
C.free(unsafe.Pointer(cStorageConfig.storage_type))
C.free(unsafe.Pointer(cStorageConfig.cloud_provider))
C.free(unsafe.Pointer(cStorageConfig.iam_endpoint))
C.free(unsafe.Pointer(cStorageConfig.log_level))
C.free(unsafe.Pointer(cStorageConfig.sslCACert))
C.free(unsafe.Pointer(cStorageConfig.region))
C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json))
}