mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: delete some no lint code (#32182)
#31728 Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
ccce1e928a
commit
4fb8044a27
@ -34,7 +34,6 @@ endif()
|
||||
|
||||
set(STORAGE_FILES
|
||||
${STORAGE_FILES}
|
||||
parquet_c.cpp
|
||||
PayloadStream.cpp
|
||||
DataCodec.cpp
|
||||
Util.cpp
|
||||
|
||||
@ -1,432 +0,0 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <mutex>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/FieldData.h"
|
||||
#include "storage/parquet_c.h"
|
||||
#include "storage/PayloadReader.h"
|
||||
#include "storage/PayloadWriter.h"
|
||||
#include "storage/Util.h"
|
||||
|
||||
using Payload = milvus::storage::Payload;
|
||||
using PayloadWriter = milvus::storage::PayloadWriter;
|
||||
using PayloadReader = milvus::storage::PayloadReader;
|
||||
|
||||
extern "C" CPayloadWriter
|
||||
NewPayloadWriter(int columnType) {
|
||||
auto data_type = static_cast<milvus::DataType>(columnType);
|
||||
auto p = std::make_unique<PayloadWriter>(data_type);
|
||||
|
||||
return reinterpret_cast<CPayloadWriter>(p.release());
|
||||
}
|
||||
|
||||
CPayloadWriter
|
||||
NewVectorPayloadWriter(int columnType, int dim) {
|
||||
auto data_type = static_cast<milvus::DataType>(columnType);
|
||||
auto p = std::make_unique<PayloadWriter>(data_type, dim);
|
||||
|
||||
return reinterpret_cast<CPayloadWriter>(p.release());
|
||||
}
|
||||
|
||||
CStatus
|
||||
AddValuesToPayload(CPayloadWriter payloadWriter, const Payload& info) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
p->add_payload(info);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddBooleanToPayload(CPayloadWriter payloadWriter, bool* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::BOOL,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::INT8,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::INT16,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::INT32,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::INT64,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddFloatToPayload(CPayloadWriter payloadWriter, float* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::FLOAT,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddDoubleToPayload(CPayloadWriter payloadWriter, double* values, int length) {
|
||||
auto raw_data_info = Payload{milvus::DataType::DOUBLE,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length};
|
||||
return AddValuesToPayload(payloadWriter, raw_data_info);
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddOneStringToPayload(CPayloadWriter payloadWriter, char* cstr, int str_size) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
p->add_one_string_payload(cstr, str_size);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddOneArrayToPayload(CPayloadWriter payloadWriter, uint8_t* data, int length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
p->add_one_binary_payload(data, length);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddOneJSONToPayload(CPayloadWriter payloadWriter, uint8_t* data, int length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
p->add_one_binary_payload(data, length);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddBinaryVectorToPayload(CPayloadWriter payloadWriter,
|
||||
uint8_t* values,
|
||||
int dimension,
|
||||
int length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
auto raw_data_info =
|
||||
Payload{milvus::DataType::VECTOR_BINARY, values, length, dimension};
|
||||
p->add_payload(raw_data_info);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
AddFloatVectorToPayload(CPayloadWriter payloadWriter,
|
||||
float* values,
|
||||
int dimension,
|
||||
int length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
auto raw_data_info = Payload{milvus::DataType::VECTOR_FLOAT,
|
||||
reinterpret_cast<const uint8_t*>(values),
|
||||
length,
|
||||
dimension};
|
||||
p->add_payload(raw_data_info);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
FinishPayloadWriter(CPayloadWriter payloadWriter) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
p->finish();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
CBuffer
|
||||
GetPayloadBufferFromWriter(CPayloadWriter payloadWriter) {
|
||||
CBuffer buf;
|
||||
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
if (!p->has_finished()) {
|
||||
buf.data = nullptr;
|
||||
buf.length = 0;
|
||||
return buf;
|
||||
}
|
||||
auto& output = p->get_payload_buffer();
|
||||
buf.length = static_cast<int>(output.size());
|
||||
buf.data = (char*)(output.data());
|
||||
return buf;
|
||||
}
|
||||
|
||||
int
|
||||
GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(payloadWriter);
|
||||
return p->get_payload_length();
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
ReleasePayloadWriter(CPayloadWriter handler) {
|
||||
auto p = reinterpret_cast<PayloadWriter*>(handler);
|
||||
if (p != nullptr) {
|
||||
delete p;
|
||||
milvus::storage::ReleaseArrowUnused();
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
NewPayloadReader(int columnType,
|
||||
uint8_t* buffer,
|
||||
int64_t buf_size,
|
||||
CPayloadReader* c_reader) {
|
||||
auto column_type = static_cast<milvus::DataType>(columnType);
|
||||
switch (column_type) {
|
||||
case milvus::DataType::BOOL:
|
||||
case milvus::DataType::INT8:
|
||||
case milvus::DataType::INT16:
|
||||
case milvus::DataType::INT32:
|
||||
case milvus::DataType::INT64:
|
||||
case milvus::DataType::FLOAT:
|
||||
case milvus::DataType::DOUBLE:
|
||||
case milvus::DataType::STRING:
|
||||
case milvus::DataType::VARCHAR:
|
||||
case milvus::DataType::VECTOR_BINARY:
|
||||
case milvus::DataType::VECTOR_FLOAT: {
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
return milvus::FailureCStatus(milvus::DataTypeInvalid,
|
||||
"unsupported data type");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
auto p = std::make_unique<PayloadReader>(buffer, buf_size, column_type);
|
||||
*c_reader = (CPayloadReader)(p.release());
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetBoolFromPayload(CPayloadReader payloadReader, int idx, bool* value) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*value = *reinterpret_cast<const bool*>(field_data->RawValue(idx));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetInt8FromPayload(CPayloadReader payloadReader, int8_t** values, int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<int8_t*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetInt16FromPayload(CPayloadReader payloadReader,
|
||||
int16_t** values,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<int16_t*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetInt32FromPayload(CPayloadReader payloadReader,
|
||||
int32_t** values,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<int32_t*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetInt64FromPayload(CPayloadReader payloadReader,
|
||||
int64_t** values,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<int64_t*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetFloatFromPayload(CPayloadReader payloadReader, float** values, int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<float*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetDoubleFromPayload(CPayloadReader payloadReader,
|
||||
double** values,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*length = field_data->get_num_rows();
|
||||
*values =
|
||||
reinterpret_cast<double*>(const_cast<void*>(field_data->Data()));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetOneStringFromPayload(CPayloadReader payloadReader,
|
||||
int idx,
|
||||
char** cstr,
|
||||
int* str_size) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
auto str = const_cast<void*>(field_data->RawValue(idx));
|
||||
*cstr = (char*)(*static_cast<std::string*>(str)).c_str();
|
||||
*str_size = field_data->Size(idx);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetBinaryVectorFromPayload(CPayloadReader payloadReader,
|
||||
uint8_t** values,
|
||||
int* dimension,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*values = (uint8_t*)field_data->Data();
|
||||
*dimension = field_data->get_dim();
|
||||
*length = field_data->get_num_rows();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
GetFloatVectorFromPayload(CPayloadReader payloadReader,
|
||||
float** values,
|
||||
int* dimension,
|
||||
int* length) {
|
||||
try {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
*values = (float*)field_data->Data();
|
||||
*dimension = field_data->get_dim();
|
||||
*length = field_data->get_num_rows();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" int
|
||||
GetPayloadLengthFromReader(CPayloadReader payloadReader) {
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
auto field_data = p->get_field_data();
|
||||
return field_data->get_num_rows();
|
||||
}
|
||||
|
||||
extern "C" CStatus
|
||||
ReleasePayloadReader(CPayloadReader payloadReader) {
|
||||
try {
|
||||
AssertInfo(payloadReader != nullptr,
|
||||
"released payloadReader should not be null pointer");
|
||||
auto p = reinterpret_cast<PayloadReader*>(payloadReader);
|
||||
delete (p);
|
||||
|
||||
milvus::storage::ReleaseArrowUnused();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
//Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
@ -16,119 +16,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#include "common/type_c.h"
|
||||
|
||||
typedef struct CBuffer {
|
||||
char* data;
|
||||
int length;
|
||||
} CBuffer;
|
||||
|
||||
//============= payload writer ======================
|
||||
// TODO(SPARSE): CPayloadWriter is no longer used as we switch to the payload
|
||||
// writer in golang. Thus not implementing sparse float vector support here.
|
||||
typedef void* CPayloadWriter;
|
||||
CPayloadWriter
|
||||
NewPayloadWriter(int columnType);
|
||||
CPayloadWriter
|
||||
NewVectorPayloadWriter(int columnType, int dim);
|
||||
CStatus
|
||||
AddBooleanToPayload(CPayloadWriter payloadWriter, bool* values, int length);
|
||||
CStatus
|
||||
AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t* values, int length);
|
||||
CStatus
|
||||
AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t* values, int length);
|
||||
CStatus
|
||||
AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t* values, int length);
|
||||
CStatus
|
||||
AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t* values, int length);
|
||||
CStatus
|
||||
AddFloatToPayload(CPayloadWriter payloadWriter, float* values, int length);
|
||||
CStatus
|
||||
AddDoubleToPayload(CPayloadWriter payloadWriter, double* values, int length);
|
||||
CStatus
|
||||
AddOneStringToPayload(CPayloadWriter payloadWriter, char* cstr, int str_size);
|
||||
CStatus
|
||||
AddOneArrayToPayload(CPayloadWriter payloadWriter, uint8_t* cdata, int length);
|
||||
CStatus
|
||||
AddOneJSONToPayload(CPayloadWriter payloadWriter, uint8_t* cdata, int length);
|
||||
CStatus
|
||||
AddBinaryVectorToPayload(CPayloadWriter payloadWriter,
|
||||
uint8_t* values,
|
||||
int dimension,
|
||||
int length);
|
||||
CStatus
|
||||
AddFloatVectorToPayload(CPayloadWriter payloadWriter,
|
||||
float* values,
|
||||
int dimension,
|
||||
int length);
|
||||
|
||||
CStatus
|
||||
FinishPayloadWriter(CPayloadWriter payloadWriter);
|
||||
CBuffer
|
||||
GetPayloadBufferFromWriter(CPayloadWriter payloadWriter);
|
||||
int
|
||||
GetPayloadLengthFromWriter(CPayloadWriter payloadWriter);
|
||||
void
|
||||
ReleasePayloadWriter(CPayloadWriter handler);
|
||||
|
||||
//============= payload reader ======================
|
||||
typedef void* CPayloadReader;
|
||||
CStatus
|
||||
NewPayloadReader(int columnType,
|
||||
uint8_t* buffer,
|
||||
int64_t buf_size,
|
||||
CPayloadReader* c_reader);
|
||||
CStatus
|
||||
GetBoolFromPayload(CPayloadReader payloadReader, int idx, bool* value);
|
||||
CStatus
|
||||
GetInt8FromPayload(CPayloadReader payloadReader, int8_t** values, int* length);
|
||||
CStatus
|
||||
GetInt16FromPayload(CPayloadReader payloadReader,
|
||||
int16_t** values,
|
||||
int* length);
|
||||
CStatus
|
||||
GetInt32FromPayload(CPayloadReader payloadReader,
|
||||
int32_t** values,
|
||||
int* length);
|
||||
CStatus
|
||||
GetInt64FromPayload(CPayloadReader payloadReader,
|
||||
int64_t** values,
|
||||
int* length);
|
||||
CStatus
|
||||
GetFloatFromPayload(CPayloadReader payloadReader, float** values, int* length);
|
||||
CStatus
|
||||
GetDoubleFromPayload(CPayloadReader payloadReader,
|
||||
double** values,
|
||||
int* length);
|
||||
CStatus
|
||||
GetOneStringFromPayload(CPayloadReader payloadReader,
|
||||
int idx,
|
||||
char** cstr,
|
||||
int* str_size);
|
||||
CStatus
|
||||
GetBinaryVectorFromPayload(CPayloadReader payloadReader,
|
||||
uint8_t** values,
|
||||
int* dimension,
|
||||
int* length);
|
||||
CStatus
|
||||
GetFloatVectorFromPayload(CPayloadReader payloadReader,
|
||||
float** values,
|
||||
int* dimension,
|
||||
int* length);
|
||||
|
||||
int
|
||||
GetPayloadLengthFromReader(CPayloadReader payloadReader);
|
||||
|
||||
CStatus
|
||||
ReleasePayloadReader(CPayloadReader payloadReader);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
using Payload = milvus::storage::Payload;
|
||||
using PayloadWriter = milvus::storage::PayloadWriter;
|
||||
using PayloadReader = milvus::storage::PayloadReader;
|
||||
@ -1,426 +0,0 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <fstream>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "storage/parquet_c.h"
|
||||
#include "storage/PayloadReader.h"
|
||||
#include "storage/PayloadWriter.h"
|
||||
|
||||
namespace wrapper = milvus::storage;
|
||||
using ErrorCode = milvus::ErrorCode;
|
||||
|
||||
static void
|
||||
WriteToFile(CBuffer cb) {
|
||||
auto data_file =
|
||||
std::ofstream("/tmp/wrapper_test_data.dat", std::ios::binary);
|
||||
data_file.write(cb.data, cb.length);
|
||||
data_file.close();
|
||||
}
|
||||
|
||||
static std::shared_ptr<arrow::Table>
|
||||
ReadFromFile() {
|
||||
std::shared_ptr<arrow::io::ReadableFile> infile;
|
||||
auto rst = arrow::io::ReadableFile::Open("/tmp/wrapper_test_data.dat");
|
||||
if (!rst.ok())
|
||||
return nullptr;
|
||||
infile = *rst;
|
||||
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
std::unique_ptr<parquet::arrow::FileReader> reader;
|
||||
auto st =
|
||||
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader);
|
||||
if (!st.ok())
|
||||
return nullptr;
|
||||
st = reader->ReadTable(&table);
|
||||
if (!st.ok())
|
||||
return nullptr;
|
||||
return table;
|
||||
}
|
||||
|
||||
TEST(storage, inoutstream) {
|
||||
arrow::Int64Builder i64builder;
|
||||
arrow::Status st;
|
||||
st = i64builder.AppendValues({1, 2, 3, 4, 5});
|
||||
ASSERT_TRUE(st.ok());
|
||||
std::shared_ptr<arrow::Array> i64array;
|
||||
st = i64builder.Finish(&i64array);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
auto schema = arrow::schema({arrow::field("val", arrow::int64())});
|
||||
ASSERT_NE(schema, nullptr);
|
||||
auto table = arrow::Table::Make(schema, {i64array});
|
||||
ASSERT_NE(table, nullptr);
|
||||
|
||||
auto os = std::make_shared<milvus::storage::PayloadOutputStream>();
|
||||
st = parquet::arrow::WriteTable(
|
||||
*table, arrow::default_memory_pool(), os, 1024);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
const uint8_t* buf = os->Buffer().data();
|
||||
int64_t buf_size = os->Buffer().size();
|
||||
auto is =
|
||||
std::make_shared<milvus::storage::PayloadInputStream>(buf, buf_size);
|
||||
|
||||
std::shared_ptr<arrow::Table> intable;
|
||||
std::unique_ptr<parquet::arrow::FileReader> reader;
|
||||
st = parquet::arrow::OpenFile(is, arrow::default_memory_pool(), &reader);
|
||||
ASSERT_TRUE(st.ok());
|
||||
st = reader->ReadTable(&intable);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
auto chunks = intable->column(0)->chunks();
|
||||
ASSERT_EQ(chunks.size(), 1);
|
||||
|
||||
auto inarray = std::dynamic_pointer_cast<arrow::Int64Array>(chunks[0]);
|
||||
ASSERT_NE(inarray, nullptr);
|
||||
ASSERT_EQ(inarray->Value(0), 1);
|
||||
ASSERT_EQ(inarray->Value(1), 2);
|
||||
ASSERT_EQ(inarray->Value(2), 3);
|
||||
ASSERT_EQ(inarray->Value(3), 4);
|
||||
ASSERT_EQ(inarray->Value(4), 5);
|
||||
}
|
||||
|
||||
TEST(storage, boolean) {
|
||||
auto payload = NewPayloadWriter(int(milvus::DataType::BOOL));
|
||||
bool data[] = {true, false, true, false};
|
||||
|
||||
auto st = AddBooleanToPayload(payload, data, 4);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
ASSERT_GT(cb.length, 0);
|
||||
ASSERT_NE(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 4);
|
||||
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(
|
||||
int(milvus::DataType::BOOL), (uint8_t*)cb.data, cb.length, &reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
bool* values;
|
||||
int length = GetPayloadLengthFromReader(reader);
|
||||
ASSERT_EQ(length, 4);
|
||||
for (int i = 0; i < length; i++) {
|
||||
bool value;
|
||||
st = GetBoolFromPayload(reader, i, &value);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(data[i], value);
|
||||
}
|
||||
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
#define NUMERIC_TEST( \
|
||||
TEST_NAME, COLUMN_TYPE, DATA_TYPE, ADD_FUNC, GET_FUNC, ARRAY_TYPE) \
|
||||
TEST(wrapper, TEST_NAME) { \
|
||||
auto payload = NewPayloadWriter(COLUMN_TYPE); \
|
||||
DATA_TYPE data[] = {-1, 1, -100, 100}; \
|
||||
\
|
||||
auto st = ADD_FUNC(payload, data, 4); \
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success); \
|
||||
st = FinishPayloadWriter(payload); \
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success); \
|
||||
auto cb = GetPayloadBufferFromWriter(payload); \
|
||||
ASSERT_GT(cb.length, 0); \
|
||||
ASSERT_NE(cb.data, nullptr); \
|
||||
auto nums = GetPayloadLengthFromWriter(payload); \
|
||||
ASSERT_EQ(nums, 4); \
|
||||
\
|
||||
CPayloadReader reader; \
|
||||
st = NewPayloadReader( \
|
||||
COLUMN_TYPE, (uint8_t*)cb.data, cb.length, &reader); \
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success); \
|
||||
DATA_TYPE* values; \
|
||||
int length; \
|
||||
st = GET_FUNC(reader, &values, &length); \
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success); \
|
||||
ASSERT_NE(values, nullptr); \
|
||||
ASSERT_EQ(length, 4); \
|
||||
length = GetPayloadLengthFromReader(reader); \
|
||||
ASSERT_EQ(length, 4); \
|
||||
\
|
||||
for (int i = 0; i < length; i++) { \
|
||||
ASSERT_EQ(data[i], values[i]); \
|
||||
} \
|
||||
\
|
||||
ReleasePayloadWriter(payload); \
|
||||
st = ReleasePayloadReader(reader); \
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success); \
|
||||
}
|
||||
|
||||
NUMERIC_TEST(int8,
|
||||
int(milvus::DataType::INT8),
|
||||
int8_t,
|
||||
AddInt8ToPayload,
|
||||
GetInt8FromPayload,
|
||||
arrow::Int8Array)
|
||||
NUMERIC_TEST(int16,
|
||||
int(milvus::DataType::INT16),
|
||||
int16_t,
|
||||
AddInt16ToPayload,
|
||||
GetInt16FromPayload,
|
||||
arrow::Int16Array)
|
||||
NUMERIC_TEST(int32,
|
||||
int(milvus::DataType::INT32),
|
||||
int32_t,
|
||||
AddInt32ToPayload,
|
||||
GetInt32FromPayload,
|
||||
arrow::Int32Array)
|
||||
NUMERIC_TEST(int64,
|
||||
int(milvus::DataType::INT64),
|
||||
int64_t,
|
||||
AddInt64ToPayload,
|
||||
GetInt64FromPayload,
|
||||
arrow::Int64Array)
|
||||
NUMERIC_TEST(float32,
|
||||
int(milvus::DataType::FLOAT),
|
||||
float,
|
||||
AddFloatToPayload,
|
||||
GetFloatFromPayload,
|
||||
arrow::FloatArray)
|
||||
NUMERIC_TEST(float64,
|
||||
int(milvus::DataType::DOUBLE),
|
||||
double,
|
||||
AddDoubleToPayload,
|
||||
GetDoubleFromPayload,
|
||||
arrow::DoubleArray)
|
||||
|
||||
TEST(storage, stringarray) {
|
||||
auto payload = NewPayloadWriter(int(milvus::DataType::VARCHAR));
|
||||
auto st = AddOneStringToPayload(payload, (char*)"1234", 4);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
st = AddOneStringToPayload(payload, (char*)"12345", 5);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
char v[3] = {0};
|
||||
v[1] = 'a';
|
||||
st = AddOneStringToPayload(payload, v, 3);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
|
||||
st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
ASSERT_GT(cb.length, 0);
|
||||
ASSERT_NE(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 3);
|
||||
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(
|
||||
int(milvus::DataType::VARCHAR), (uint8_t*)cb.data, cb.length, &reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
int length = GetPayloadLengthFromReader(reader);
|
||||
ASSERT_EQ(length, 3);
|
||||
char *v0, *v1, *v2;
|
||||
int s0, s1, s2;
|
||||
st = GetOneStringFromPayload(reader, 0, &v0, &s0);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(s0, 4);
|
||||
ASSERT_EQ(v0[0], '1');
|
||||
ASSERT_EQ(v0[1], '2');
|
||||
ASSERT_EQ(v0[2], '3');
|
||||
ASSERT_EQ(v0[3], '4');
|
||||
|
||||
st = GetOneStringFromPayload(reader, 1, &v1, &s1);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(s1, 5);
|
||||
ASSERT_EQ(v1[0], '1');
|
||||
ASSERT_EQ(v1[1], '2');
|
||||
ASSERT_EQ(v1[2], '3');
|
||||
ASSERT_EQ(v1[3], '4');
|
||||
ASSERT_EQ(v1[4], '5');
|
||||
|
||||
st = GetOneStringFromPayload(reader, 2, &v2, &s2);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(s2, 3);
|
||||
ASSERT_EQ(v2[0], 0);
|
||||
ASSERT_EQ(v2[1], 'a');
|
||||
ASSERT_EQ(v2[2], 0);
|
||||
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
TEST(storage, binary_vector) {
|
||||
int DIM = 16;
|
||||
auto payload =
|
||||
NewVectorPayloadWriter(int(milvus::DataType::VECTOR_BINARY), DIM);
|
||||
uint8_t data[] = {0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8};
|
||||
|
||||
auto st = AddBinaryVectorToPayload(payload, data, 16, 4);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
ASSERT_GT(cb.length, 0);
|
||||
ASSERT_NE(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 4);
|
||||
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(int(milvus::DataType::VECTOR_BINARY),
|
||||
(uint8_t*)cb.data,
|
||||
cb.length,
|
||||
&reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
uint8_t* values;
|
||||
int length;
|
||||
int dim;
|
||||
|
||||
st = GetBinaryVectorFromPayload(reader, &values, &dim, &length);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_NE(values, nullptr);
|
||||
ASSERT_EQ(dim, 16);
|
||||
ASSERT_EQ(length, 4);
|
||||
length = GetPayloadLengthFromReader(reader);
|
||||
ASSERT_EQ(length, 4);
|
||||
for (int i = 0; i < 8; i++) {
|
||||
ASSERT_EQ(values[i], data[i]);
|
||||
}
|
||||
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
TEST(storage, binary_vector_empty) {
|
||||
int DIM = 16;
|
||||
auto payload =
|
||||
NewVectorPayloadWriter(int(milvus::DataType::VECTOR_BINARY), DIM);
|
||||
auto st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
// ASSERT_EQ(cb.length, 0);
|
||||
// ASSERT_EQ(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 0);
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(int(milvus::DataType::VECTOR_BINARY),
|
||||
(uint8_t*)cb.data,
|
||||
cb.length,
|
||||
&reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(0, GetPayloadLengthFromReader(reader));
|
||||
// ASSERT_EQ(reader, nullptr);
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
TEST(storage, float_vector) {
|
||||
int DIM = 2;
|
||||
auto payload =
|
||||
NewVectorPayloadWriter(int(milvus::DataType::VECTOR_FLOAT), DIM);
|
||||
float data[] = {1, 2, 3, 4, 5, 6, 7, 8};
|
||||
|
||||
auto st = AddFloatVectorToPayload(payload, data, DIM, 4);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
ASSERT_GT(cb.length, 0);
|
||||
ASSERT_NE(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 4);
|
||||
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(int(milvus::DataType::VECTOR_FLOAT),
|
||||
(uint8_t*)cb.data,
|
||||
cb.length,
|
||||
&reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
float* values;
|
||||
int length;
|
||||
int dim;
|
||||
|
||||
st = GetFloatVectorFromPayload(reader, &values, &dim, &length);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_NE(values, nullptr);
|
||||
ASSERT_EQ(dim, 2);
|
||||
ASSERT_EQ(length, 4);
|
||||
length = GetPayloadLengthFromReader(reader);
|
||||
ASSERT_EQ(length, 4);
|
||||
for (int i = 0; i < 8; i++) {
|
||||
ASSERT_EQ(values[i], data[i]);
|
||||
}
|
||||
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
TEST(storage, float_vector_empty) {
|
||||
int DIM = 2;
|
||||
auto payload =
|
||||
NewVectorPayloadWriter(int(milvus::DataType::VECTOR_FLOAT), DIM);
|
||||
auto st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
// ASSERT_EQ(cb.length, 0);
|
||||
// ASSERT_EQ(cb.data, nullptr);
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 0);
|
||||
CPayloadReader reader;
|
||||
st = NewPayloadReader(int(milvus::DataType::VECTOR_FLOAT),
|
||||
(uint8_t*)cb.data,
|
||||
cb.length,
|
||||
&reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
ASSERT_EQ(0, GetPayloadLengthFromReader(reader));
|
||||
// ASSERT_EQ(reader, nullptr);
|
||||
ReleasePayloadWriter(payload);
|
||||
st = ReleasePayloadReader(reader);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
}
|
||||
|
||||
TEST(storage, int8_2) {
|
||||
auto payload = NewPayloadWriter(int(milvus::DataType::INT8));
|
||||
int8_t data[] = {-1, 1, -100, 100};
|
||||
|
||||
auto st = AddInt8ToPayload(payload, data, 4);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
st = FinishPayloadWriter(payload);
|
||||
ASSERT_EQ(st.error_code, ErrorCode::Success);
|
||||
auto cb = GetPayloadBufferFromWriter(payload);
|
||||
ASSERT_GT(cb.length, 0);
|
||||
ASSERT_NE(cb.data, nullptr);
|
||||
|
||||
WriteToFile(cb);
|
||||
|
||||
auto nums = GetPayloadLengthFromWriter(payload);
|
||||
ASSERT_EQ(nums, 4);
|
||||
ReleasePayloadWriter(payload);
|
||||
|
||||
auto table = ReadFromFile();
|
||||
ASSERT_NE(table, nullptr);
|
||||
|
||||
auto chunks = table->column(0)->chunks();
|
||||
ASSERT_EQ(chunks.size(), 1);
|
||||
|
||||
auto bool_array = std::dynamic_pointer_cast<arrow::Int8Array>(chunks[0]);
|
||||
ASSERT_NE(bool_array, nullptr);
|
||||
|
||||
ASSERT_EQ(bool_array->Value(0), -1);
|
||||
ASSERT_EQ(bool_array->Value(1), 1);
|
||||
ASSERT_EQ(bool_array->Value(2), -100);
|
||||
ASSERT_EQ(bool_array->Value(3), 100);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user