Fix the segcore payload reader can't read JSON data (#24212)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-05-20 22:23:25 +08:00 committed by GitHub
parent 4fbb3f2b34
commit 1eb804fcbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 28 deletions

View File

@ -15,6 +15,7 @@
// limitations under the License.
#include "storage/FieldData.h"
#include "common/Json.h"
namespace milvus::storage {
@ -95,6 +96,15 @@ FieldDataImpl<Type, is_scalar>::FillFieldData(const std::shared_ptr<arrow::Array
}
return FillFieldData(values.data(), element_count);
}
case DataType::JSON: {
AssertInfo(array->type()->id() == arrow::Type::type::BINARY, "inconsistent data type");
auto binary_array = std::dynamic_pointer_cast<arrow::BinaryArray>(array);
std::vector<std::string> values(element_count);
for (size_t index = 0; index < element_count; ++index) {
values[index] = binary_array->GetString(index);
}
return FillFieldData(values.data(), element_count);
}
case DataType::VECTOR_FLOAT:
case DataType::VECTOR_BINARY: {
auto array_info =

View File

@ -25,6 +25,7 @@
#include "arrow/api.h"
#include "common/FieldMeta.h"
#include "common/Json.h"
#include "common/Utils.h"
#include "common/VectorTrait.h"
#include "exceptions/EasyAssert.h"
@ -206,5 +207,4 @@ class FieldDataStringImpl : public FieldDataImpl<std::string, true> {
return field_data_[offset].size();
}
};
} // namespace milvus::storage

View File

@ -16,6 +16,7 @@
#include <mutex>
#include "common/Types.h"
#include "storage/parquet_c.h"
#include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h"
@ -219,6 +220,7 @@ NewPayloadReader(int columnType, uint8_t* buffer, int64_t buf_size, CPayloadRead
case milvus::DataType::DOUBLE:
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR:
case milvus::DataType::JSON:
case milvus::DataType::VECTOR_BINARY:
case milvus::DataType::VECTOR_FLOAT: {
break;

View File

@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include <fstream>
#include "common/Types.h"
#include "storage/parquet_c.h"
#include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h"
@ -221,6 +222,31 @@ TEST(storage, stringarray) {
ASSERT_EQ(st.error_code, ErrorCode::Success);
}
TEST(storage, jsonarray) {
auto payload = NewPayloadWriter(int(milvus::DataType::JSON));
auto st = AddOneJSONToPayload(payload, (uint8_t*)"{}", 2);
ASSERT_EQ(st.error_code, ErrorCode::Success);
st = AddOneJSONToPayload(payload, (uint8_t*)"{\"key\":123}", 11);
ASSERT_EQ(st.error_code, ErrorCode::Success);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::Success);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 2);
CPayloadReader reader;
st = NewPayloadReader(int(milvus::DataType::JSON), (uint8_t*)cb.data, cb.length, &reader);
ASSERT_EQ(st.error_code, ErrorCode::Success);
int length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 2);
ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}
TEST(storage, binary_vector) {
int DIM = 16;
auto payload = NewVectorPayloadWriter(int(milvus::DataType::VECTOR_BINARY), DIM);

View File

@ -298,10 +298,8 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0,
case DataType::JSON: {
vector<std::string> data(N);
for (int i = 0; i < N / repeat_count; i++) {
auto str = R"({"int":)" + std::to_string(er()) +
R"(,"double":)" +
std::to_string(static_cast<double>(er())) +
R"(,"string":")" + std::to_string(er()) +
auto str = R"({"int":)" + std::to_string(er()) + R"(,"double":)" +
std::to_string(static_cast<double>(er())) + R"(,"string":")" + std::to_string(er()) +
R"(","bool": true)" + "}";
data[i] = str;
}

View File

@ -814,6 +814,20 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
}
rst = data
case schemapb.DataType_JSON:
var data = &storage.JSONFieldData{
Data: make([][]byte, 0, len(content)),
}
for _, c := range content {
r, ok := c.([]byte)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_FloatVector:
var data = &storage.FloatVectorFieldData{
Data: []float32{},

View File

@ -102,6 +102,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
{true, schemapb.DataType_Float, []interface{}{float32(1), float32(2)}, "valid float32"},
{true, schemapb.DataType_Double, []interface{}{float64(1), float64(2)}, "valid float64"},
{true, schemapb.DataType_VarChar, []interface{}{"test1", "test2"}, "valid varChar"},
{true, schemapb.DataType_JSON, []interface{}{[]byte("{\"key\":\"value\"}"), []byte("{\"hello\":\"world\"}")}, "valid json"},
{true, schemapb.DataType_FloatVector, []interface{}{[]float32{1.0, 2.0}}, "valid floatvector"},
{true, schemapb.DataType_BinaryVector, []interface{}{[]byte{255}}, "valid binaryvector"},
{false, schemapb.DataType_Bool, []interface{}{1, 2}, "invalid bool"},
@ -112,6 +113,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
{false, schemapb.DataType_Float, []interface{}{nil, nil}, "invalid float32"},
{false, schemapb.DataType_Double, []interface{}{nil, nil}, "invalid float64"},
{false, schemapb.DataType_VarChar, []interface{}{nil, nil}, "invalid varChar"},
{false, schemapb.DataType_JSON, []interface{}{nil, nil}, "invalid json"},
{false, schemapb.DataType_FloatVector, []interface{}{nil, nil}, "invalid floatvector"},
{false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"},
{false, schemapb.DataType_None, nil, "invalid data type"},

View File

@ -537,6 +537,8 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
dstScalar.GetDoubleData().Data = dstScalar.GetDoubleData().Data[:len(dstScalar.GetDoubleData().Data)-1]
case *schemapb.ScalarField_StringData:
dstScalar.GetStringData().Data = dstScalar.GetStringData().Data[:len(dstScalar.GetStringData().Data)-1]
case *schemapb.ScalarField_JsonData:
dstScalar.GetJsonData().Data = dstScalar.GetJsonData().Data[:len(dstScalar.GetJsonData().Data)-1]
default:
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
}
@ -643,6 +645,16 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) {
} else {
dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data...)
}
case *schemapb.ScalarField_JsonData:
if dstScalar.GetJsonData() == nil {
dstScalar.Data = &schemapb.ScalarField_JsonData{
JsonData: &schemapb.JSONArray{
Data: srcScalar.JsonData.Data,
},
}
} else {
dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data...)
}
default:
log.Error("Not supported field type", zap.String("field type", srcFieldData.Type.String()))
}

View File

@ -631,32 +631,39 @@ func TestDeleteFieldData(t *testing.T) {
Int64FieldName = "Int64Field"
FloatFieldName = "FloatField"
DoubleFieldName = "DoubleField"
JSONFieldName = "JSONField"
BinaryVectorFieldName = "BinaryVectorField"
FloatVectorFieldName = "FloatVectorField"
BoolFieldID = common.StartOfUserFieldID + 1
Int32FieldID = common.StartOfUserFieldID + 2
Int64FieldID = common.StartOfUserFieldID + 3
FloatFieldID = common.StartOfUserFieldID + 4
DoubleFieldID = common.StartOfUserFieldID + 5
BinaryVectorFieldID = common.StartOfUserFieldID + 6
FloatVectorFieldID = common.StartOfUserFieldID + 7
)
const (
BoolFieldID = common.StartOfUserFieldID + iota
Int32FieldID
Int64FieldID
FloatFieldID
DoubleFieldID
JSONFieldID
BinaryVectorFieldID
FloatVectorFieldID
)
BoolArray := []bool{true, false}
Int32Array := []int32{1, 2}
Int64Array := []int64{11, 22}
FloatArray := []float32{1.0, 2.0}
DoubleArray := []float64{11.0, 22.0}
JSONArray := [][]byte{[]byte("{\"hello\":0}"), []byte("{\"key\":1}")}
BinaryVector := []byte{0x12, 0x34}
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0, 55.0, 66.0, 77.0, 88.0}
result1 := make([]*schemapb.FieldData, 7)
result2 := make([]*schemapb.FieldData, 7)
result1 := make([]*schemapb.FieldData, 8)
result2 := make([]*schemapb.FieldData, 8)
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatFieldName, FloatFieldID, schemapb.DataType_Float, FloatArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(DoubleFieldName, DoubleFieldID, schemapb.DataType_Double, DoubleArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(JSONFieldName, JSONFieldID, schemapb.DataType_JSON, JSONArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(BinaryVectorFieldName, BinaryVectorFieldID, schemapb.DataType_BinaryVector, BinaryVector[0:Dim/8], Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:Dim], Dim))
@ -666,30 +673,33 @@ func TestDeleteFieldData(t *testing.T) {
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatFieldName, FloatFieldID, schemapb.DataType_Float, FloatArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(DoubleFieldName, DoubleFieldID, schemapb.DataType_Double, DoubleArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(JSONFieldName, JSONFieldID, schemapb.DataType_JSON, JSONArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(BinaryVectorFieldName, BinaryVectorFieldID, schemapb.DataType_BinaryVector, BinaryVector[Dim/8:2*Dim/8], Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[Dim:2*Dim], Dim))
AppendFieldData(result1, fieldDataArray1, 0)
AppendFieldData(result1, fieldDataArray2, 0)
DeleteFieldData(result1)
assert.Equal(t, BoolArray[0:1], result1[0].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[0:1], result1[1].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[0:1], result1[2].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[0:1], result1[3].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[0:1], result1[4].GetScalars().GetDoubleData().Data)
assert.Equal(t, BinaryVector[0:Dim/8], result1[5].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[0:Dim], result1[6].GetVectors().GetFloatVector().Data)
assert.Equal(t, BoolArray[0:1], result1[BoolFieldID-common.StartOfUserFieldID].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[0:1], result1[Int32FieldID-common.StartOfUserFieldID].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[0:1], result1[Int64FieldID-common.StartOfUserFieldID].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[0:1], result1[FloatFieldID-common.StartOfUserFieldID].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[0:1], result1[DoubleFieldID-common.StartOfUserFieldID].GetScalars().GetDoubleData().Data)
assert.Equal(t, JSONArray[0:1], result1[JSONFieldID-common.StartOfUserFieldID].GetScalars().GetJsonData().Data)
assert.Equal(t, BinaryVector[0:Dim/8], result1[BinaryVectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[0:Dim], result1[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data)
AppendFieldData(result2, fieldDataArray2, 0)
AppendFieldData(result2, fieldDataArray1, 0)
DeleteFieldData(result2)
assert.Equal(t, BoolArray[1:2], result2[0].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[1:2], result2[1].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[1:2], result2[2].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[1:2], result2[3].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[1:2], result2[4].GetScalars().GetDoubleData().Data)
assert.Equal(t, BinaryVector[Dim/8:2*Dim/8], result2[5].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[Dim:2*Dim], result2[6].GetVectors().GetFloatVector().Data)
assert.Equal(t, BoolArray[1:2], result2[BoolFieldID-common.StartOfUserFieldID].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[1:2], result2[Int32FieldID-common.StartOfUserFieldID].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[1:2], result2[Int64FieldID-common.StartOfUserFieldID].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[1:2], result2[FloatFieldID-common.StartOfUserFieldID].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[1:2], result2[DoubleFieldID-common.StartOfUserFieldID].GetScalars().GetDoubleData().Data)
assert.Equal(t, JSONArray[1:2], result2[JSONFieldID-common.StartOfUserFieldID].GetScalars().GetJsonData().Data)
assert.Equal(t, BinaryVector[Dim/8:2*Dim/8], result2[BinaryVectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[Dim:2*Dim], result2[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data)
}
func TestGetPrimaryFieldSchema(t *testing.T) {
@ -1011,3 +1021,35 @@ func TestCalcColumnSize(t *testing.T) {
assert.Equal(t, expected, size, field.GetName())
}
}
func TestMergeFieldData(t *testing.T) {
dstFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{1, 2, 3}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{1, 2, 3}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}
srcFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{4, 5, 6}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{4, 5, 6}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}
MergeFieldData(dstFields, srcFields)
assert.Equal(t, []int64{1, 2, 3, 4, 5, 6}, dstFields[0].GetScalars().GetLongData().Data)
assert.Equal(t, []float32{1, 2, 3, 4, 5, 6}, dstFields[1].GetVectors().GetFloatVector().Data)
assert.Equal(t, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`), []byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)},
dstFields[2].GetScalars().GetJsonData().Data)
emptyField := &schemapb.FieldData{
Type: schemapb.DataType_None,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: nil,
},
},
}
MergeFieldData([]*schemapb.FieldData{emptyField}, []*schemapb.FieldData{emptyField})
}