diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go index f620483982..2eb291a149 100644 --- a/internal/storage/binlog_iterator.go +++ b/internal/storage/binlog_iterator.go @@ -127,69 +127,6 @@ func (itr *InsertBinlogIterator) isDisposed() bool { return atomic.LoadInt32(&itr.dispose) == 1 } -/* -type DeltalogIterator struct { - dispose int32 - values []*Value - pos int -} - -func NewDeltalogIterator(blob *Blob) (*DeltalogIterator, error) { - deltaCodec := NewDeleteCodec() - _, _, serData, err := deltaCodec.Deserialize(blob) - if err != nil { - return nil, err - } - - values := make([]*Value, 0, len(serData.Data)) - for pkstr, ts := range serData.Data { - pk, err := strconv.ParseInt(pkstr, 10, 64) - if err != nil { - return nil, err - } - values = append(values, &Value{pk, ts, true, nil}) - } - - sort.Slice(values, func(i, j int) bool { return values[i].id < values[j].id }) - - return &DeltalogIterator{values: values}, nil -} - -// HasNext returns true if the iterator have unread record -func (itr *DeltalogIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *DeltalogIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - tmp := itr.values[itr.pos] - itr.pos++ - return tmp, nil -} - -// Dispose disposes the iterator -func (itr *DeltalogIterator) Dispose() { - atomic.CompareAndSwapInt32(&itr.dispose, 0, 1) -} - -func (itr *DeltalogIterator) hasNext() bool { - return itr.pos < len(itr.values) -} - -func (itr *DeltalogIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.dispose) == 1 -} - -*/ - // MergeIterator merge iterators. type MergeIterator struct { disposed int32 @@ -278,156 +215,3 @@ func (itr *MergeIterator) hasNext() bool { itr.nextRecord = minRecord return true } - -/* -func NewInsertlogMergeIterator(blobs [][]*Blob) (*MergeIterator, error) { - iterators := make([]Iterator, 0, len(blobs)) - for _, fieldBlobs := range blobs { - itr, err := NewInsertBinlogIterator(fieldBlobs) - if err != nil { - return nil, err - } - iterators = append(iterators, itr) - } - - return NewMergeIterator(iterators), nil -} - -func NewDeltalogMergeIterator(blobs []*Blob) (*MergeIterator, error) { - iterators := make([]Iterator, 0, len(blobs)) - for _, blob := range blobs { - itr, err := NewDeltalogIterator(blob) - if err != nil { - return nil, err - } - iterators = append(iterators, itr) - } - return NewMergeIterator(iterators), nil -} - -type MergeSingleSegmentIterator struct { - disposed int32 - insertItr Iterator - deltaItr Iterator - timetravel int64 - nextRecord *Value - insertTmpRecord *Value - deltaTmpRecord *Value -} - -func NewMergeSingleSegmentIterator(insertBlobs [][]*Blob, deltaBlobs []*Blob, timetravel int64) (*MergeSingleSegmentIterator, error) { - insertMergeItr, err := NewInsertlogMergeIterator(insertBlobs) - if err != nil { - return nil, err - } - - deltaMergeItr, err := NewDeltalogMergeIterator(deltaBlobs) - if err != nil { - return nil, err - } - return &MergeSingleSegmentIterator{ - insertItr: insertMergeItr, - deltaItr: deltaMergeItr, - timetravel: timetravel, - }, nil -} - -// HasNext returns true if the iterator have unread record -func (itr *MergeSingleSegmentIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *MergeSingleSegmentIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - tmp := itr.nextRecord - itr.nextRecord = nil - return tmp, nil -} - -// Dispose disposes the iterator -func (itr *MergeSingleSegmentIterator) Dispose() { - if itr.isDisposed() { - return - } - - if itr.insertItr != nil { - itr.insertItr.Dispose() - } - if itr.deltaItr != nil { - itr.deltaItr.Dispose() - } - - atomic.CompareAndSwapInt32(&itr.disposed, 0, 1) -} - -func (itr *MergeSingleSegmentIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.disposed) == 1 -} - -func (itr *MergeSingleSegmentIterator) hasNext() bool { - if itr.nextRecord != nil { - return true - } - - for { - if itr.insertTmpRecord == nil && itr.insertItr.HasNext() { - r, _ := itr.insertItr.Next() - itr.insertTmpRecord = r.(*Value) - } - - if itr.deltaTmpRecord == nil && itr.deltaItr.HasNext() { - r, _ := itr.deltaItr.Next() - itr.deltaTmpRecord = r.(*Value) - } - - if itr.insertTmpRecord == nil && itr.deltaTmpRecord == nil { - return false - } else if itr.insertTmpRecord == nil { - itr.nextRecord = itr.deltaTmpRecord - itr.deltaTmpRecord = nil - return true - } else if itr.deltaTmpRecord == nil { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } else { - // merge records - if itr.insertTmpRecord.timestamp >= itr.timetravel { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } - if itr.deltaTmpRecord.timestamp >= itr.timetravel { - itr.nextRecord = itr.deltaTmpRecord - itr.deltaTmpRecord = nil - return true - } - - if itr.insertTmpRecord.id < itr.deltaTmpRecord.id { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } else if itr.insertTmpRecord.id > itr.deltaTmpRecord.id { - itr.deltaTmpRecord = nil - continue - } else if itr.insertTmpRecord.id == itr.deltaTmpRecord.id { - if itr.insertTmpRecord.timestamp <= itr.deltaTmpRecord.timestamp { - itr.insertTmpRecord = nil - continue - } else { - itr.deltaTmpRecord = nil - continue - } - } - } - - } -} -*/ diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 9f9d996c10..9ca53eff12 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -248,25 +248,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique var dim int64 if typeutil.IsVectorType(field.DataType) { switch field.DataType { - case schemapb.DataType_FloatVector: - dim, err = typeutil.GetDim(field) - if err != nil { - return nil, err - } - eventWriter, err = writer.NextInsertEventWriter(int(dim)) - case schemapb.DataType_BinaryVector: - dim, err = typeutil.GetDim(field) - if err != nil { - return nil, err - } - eventWriter, err = writer.NextInsertEventWriter(int(dim)) - case schemapb.DataType_Float16Vector: - dim, err = typeutil.GetDim(field) - if err != nil { - return nil, err - } - eventWriter, err = writer.NextInsertEventWriter(int(dim)) - case schemapb.DataType_BFloat16Vector: + case schemapb.DataType_FloatVector, + schemapb.DataType_BinaryVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector: dim, err = typeutil.GetDim(field) if err != nil { return nil, err @@ -293,136 +278,12 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique blockMemorySize := singleData.GetMemorySize() memorySize += int64(blockMemorySize) - - switch field.DataType { - case schemapb.DataType_Bool: - err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Int8: - err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Int16: - err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Int32: - err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Int64: - err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Float: - err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Double: - err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_String, schemapb.DataType_VarChar: - for _, singleString := range singleData.(*StringFieldData).Data { - err = eventWriter.AddOneStringToPayload(singleString) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Array: - for _, singleArray := range singleData.(*ArrayFieldData).Data { - err = eventWriter.AddOneArrayToPayload(singleArray) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_JSON: - for _, singleJSON := range singleData.(*JSONFieldData).Data { - err = eventWriter.AddOneJSONToPayload(singleJSON) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_BinaryVector: - err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_FloatVector: - err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_Float16Vector: - err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_BFloat16Vector: - err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim) - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - case schemapb.DataType_SparseFloatVector: - err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) - default: - return nil, fmt.Errorf("undefined data type %d", field.DataType) - } - if err != nil { + if err = AddFieldDataToPayload(eventWriter, field.DataType, singleData); err != nil { + eventWriter.Close() + writer.Close() return nil, err } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) writer.SetEventTimeStamp(startTs, endTs) } @@ -453,6 +314,81 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return blobs, nil } +func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.DataType, singleData FieldData) error { + var err error + switch dataType { + case schemapb.DataType_Bool: + if err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data); err != nil { + return err + } + case schemapb.DataType_Int8: + if err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data); err != nil { + return err + } + case schemapb.DataType_Int16: + if err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data); err != nil { + return err + } + case schemapb.DataType_Int32: + if err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data); err != nil { + return err + } + case schemapb.DataType_Int64: + if err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data); err != nil { + return err + } + case schemapb.DataType_Float: + if err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data); err != nil { + return err + } + case schemapb.DataType_Double: + if err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data); err != nil { + return err + } + case schemapb.DataType_String, schemapb.DataType_VarChar: + for _, singleString := range singleData.(*StringFieldData).Data { + if err = eventWriter.AddOneStringToPayload(singleString); err != nil { + return err + } + } + case schemapb.DataType_Array: + for _, singleArray := range singleData.(*ArrayFieldData).Data { + if err = eventWriter.AddOneArrayToPayload(singleArray); err != nil { + return err + } + } + case schemapb.DataType_JSON: + for _, singleJSON := range singleData.(*JSONFieldData).Data { + if err = eventWriter.AddOneJSONToPayload(singleJSON); err != nil { + return err + } + } + case schemapb.DataType_BinaryVector: + if err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim); err != nil { + return err + } + case schemapb.DataType_FloatVector: + if err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim); err != nil { + return err + } + case schemapb.DataType_Float16Vector: + if err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim); err != nil { + return err + } + case schemapb.DataType_BFloat16Vector: + if err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim); err != nil { + return err + } + case schemapb.DataType_SparseFloatVector: + if err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)); err != nil { + return err + } + default: + return fmt.Errorf("undefined data type %d", dataType) + } + return nil +} + func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( collectionID UniqueID, partitionID UniqueID, @@ -845,31 +781,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int return collectionID, partitionID, segmentID, nil } -// func deserializeEntity[T any, U any]( -// eventReader *EventReader, -// binlogReader *BinlogReader, -// insertData *InsertData, -// getPayloadFunc func() (U, error), -// fillDataFunc func() FieldData, -// ) error { -// fieldID := binlogReader.FieldID -// stringPayload, err := getPayloadFunc() -// if err != nil { -// eventReader.Close() -// binlogReader.Close() -// return err -// } -// -// if insertData.Data[fieldID] == nil { -// insertData.Data[fieldID] = fillDataFunc() -// } -// stringFieldData := insertData.Data[fieldID].(*T) -// -// stringFieldData.Data = append(stringFieldData.Data, stringPayload...) -// totalLength += len(stringPayload) -// insertData.Data[fieldID] = stringFieldData -// } - // Deserialize transfer blob back to insert data. // From schema, it get all fields. // For each field, it will create a binlog reader, and read all event to the buffer. diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 84820062d8..b074d045d3 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -918,3 +918,51 @@ func TestDeleteData(t *testing.T) { assert.EqualValues(t, dData.Size(), 72) }) } + +func TestAddFieldDataToPayload(t *testing.T) { + w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) + e, _ := w.NextInsertEventWriter() + var err error + err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Int8, &Int8FieldData{[]int8{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Int16, &Int16FieldData{[]int16{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Int32, &Int32FieldData{[]int32{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Int64, &Int64FieldData{[]int64{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Float, &FloatFieldData{[]float32{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Double, &DoubleFieldData{[]float64{}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_String, &StringFieldData{[]string{"test"}, schemapb.DataType_VarChar}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Array, &ArrayFieldData{ + ElementType: schemapb.DataType_VarChar, + Data: []*schemapb.ScalarField{{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, + }, + }}, + }) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_JSON, &JSONFieldData{[][]byte{[]byte(`"batch":2}`)}}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_BinaryVector, &BinaryVectorFieldData{[]byte{}, 8}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_FloatVector, &FloatVectorFieldData{[]float32{}, 4}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_Float16Vector, &Float16VectorFieldData{[]byte{}, 4}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_BFloat16Vector, &BFloat16VectorFieldData{[]byte{}, 8}) + assert.Error(t, err) + err = AddFieldDataToPayload(e, schemapb.DataType_SparseFloatVector, &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 0, + Contents: [][]byte{}, + }, + }) + assert.Error(t, err) +} diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index b8b3c68f23..d9a8f1f11f 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -243,7 +243,7 @@ func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error { } if len(data) == 0 { - return errors.New("can't add empty msgs into int64 payload") + return errors.New("can't add empty msgs into int16 payload") } builder, ok := w.builder.(*array.Int16Builder) diff --git a/internal/storage/utils.go b/internal/storage/utils.go index f3a90a8872..06e8d1ca7c 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -982,44 +982,6 @@ func binaryWrite(endian binary.ByteOrder, data interface{}) ([]byte, error) { return buf.Bytes(), nil } -// FieldDataToBytes encode field data to byte slice. -// For some fixed-length data, such as int32, int64, float vector, use binary.Write directly. -// For binary vector, return it directly. -// For bool data, first transfer to schemapb.BoolArray and then marshal it. (TODO: handle bool like other scalar data.) -// For variable-length data, such as string, first transfer to schemapb.StringArray and then marshal it. -// TODO: find a proper way to store variable-length data. Or we should unify to use protobuf? -func FieldDataToBytes(endian binary.ByteOrder, fieldData FieldData) ([]byte, error) { - switch field := fieldData.(type) { - case *BoolFieldData: - // return binaryWrite(endian, field.Data) - return boolFieldDataToPbBytes(field) - case *StringFieldData: - return stringFieldDataToPbBytes(field) - case *ArrayFieldData: - return arrayFieldDataToPbBytes(field) - case *JSONFieldData: - return jsonFieldDataToPbBytes(field) - case *BinaryVectorFieldData: - return field.Data, nil - case *FloatVectorFieldData: - return binaryWrite(endian, field.Data) - case *Int8FieldData: - return binaryWrite(endian, field.Data) - case *Int16FieldData: - return binaryWrite(endian, field.Data) - case *Int32FieldData: - return binaryWrite(endian, field.Data) - case *Int64FieldData: - return binaryWrite(endian, field.Data) - case *FloatFieldData: - return binaryWrite(endian, field.Data) - case *DoubleFieldData: - return binaryWrite(endian, field.Data) - default: - return nil, fmt.Errorf("unsupported field data: %s", field) - } -} - func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error) { insertRecord := &segcorepb.InsertRecord{} for fieldID, rawData := range insertData.Data { diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index f9d1be2d13..25eb19cf06 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -1551,94 +1551,6 @@ func binaryRead(endian binary.ByteOrder, bs []byte, receiver interface{}) error return binary.Read(reader, endian, receiver) } -func TestFieldDataToBytes(t *testing.T) { - // TODO: test big endian. - endian := common.Endian - - var bs []byte - var err error - var receiver interface{} - - f1 := &BoolFieldData{Data: []bool{true, false}} - bs, err = FieldDataToBytes(endian, f1) - assert.NoError(t, err) - var barr schemapb.BoolArray - err = proto.Unmarshal(bs, &barr) - assert.NoError(t, err) - assert.ElementsMatch(t, f1.Data, barr.Data) - - f2 := &StringFieldData{Data: []string{"true", "false"}} - bs, err = FieldDataToBytes(endian, f2) - assert.NoError(t, err) - var sarr schemapb.StringArray - err = proto.Unmarshal(bs, &sarr) - assert.NoError(t, err) - assert.ElementsMatch(t, f2.Data, sarr.Data) - - f3 := &Int8FieldData{Data: []int8{0, 1}} - bs, err = FieldDataToBytes(endian, f3) - assert.NoError(t, err) - receiver = make([]int8, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f3.Data, receiver) - - f4 := &Int16FieldData{Data: []int16{0, 1}} - bs, err = FieldDataToBytes(endian, f4) - assert.NoError(t, err) - receiver = make([]int16, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f4.Data, receiver) - - f5 := &Int32FieldData{Data: []int32{0, 1}} - bs, err = FieldDataToBytes(endian, f5) - assert.NoError(t, err) - receiver = make([]int32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f5.Data, receiver) - - f6 := &Int64FieldData{Data: []int64{0, 1}} - bs, err = FieldDataToBytes(endian, f6) - assert.NoError(t, err) - receiver = make([]int64, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f6.Data, receiver) - - // in fact, hard to compare float point value. - - f7 := &FloatFieldData{Data: []float32{0, 1}} - bs, err = FieldDataToBytes(endian, f7) - assert.NoError(t, err) - receiver = make([]float32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f7.Data, receiver) - - f8 := &DoubleFieldData{Data: []float64{0, 1}} - bs, err = FieldDataToBytes(endian, f8) - assert.NoError(t, err) - receiver = make([]float64, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f8.Data, receiver) - - f9 := &BinaryVectorFieldData{Data: []byte{0, 1, 0}} - bs, err = FieldDataToBytes(endian, f9) - assert.NoError(t, err) - assert.ElementsMatch(t, f9.Data, bs) - - f10 := &FloatVectorFieldData{Data: []float32{0, 1}} - bs, err = FieldDataToBytes(endian, f10) - assert.NoError(t, err) - receiver = make([]float32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f10.Data, receiver) -} - func TestJson(t *testing.T) { extras := make(map[string]string) extras["IndexBuildID"] = "10"