mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Remove cgo PayloadWriter (#24892)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
parent
35cb0b5ba6
commit
1ef8f0fceb
@ -16,29 +16,13 @@
|
||||
|
||||
package storage
|
||||
|
||||
/*
|
||||
#cgo pkg-config: milvus_storage
|
||||
|
||||
#include <stdlib.h>
|
||||
#include "storage/parquet_c.h"
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
// PayloadWriterInterface abstracts PayloadWriter
|
||||
type PayloadWriterInterface interface {
|
||||
AddDataToPayload(msgs interface{}, dim ...int) error
|
||||
AddDataToPayload(msgs any, dim ...int) error
|
||||
AddBoolToPayload(msgs []bool) error
|
||||
AddByteToPayload(msgs []byte) error
|
||||
AddInt8ToPayload(msgs []int8) error
|
||||
@ -61,7 +45,7 @@ type PayloadWriterInterface interface {
|
||||
|
||||
// PayloadReaderInterface abstracts PayloadReader
|
||||
type PayloadReaderInterface interface {
|
||||
GetDataFromPayload() (interface{}, int, error)
|
||||
GetDataFromPayload() (any, int, error)
|
||||
GetBoolFromPayload() ([]bool, error)
|
||||
GetByteFromPayload() ([]byte, error)
|
||||
GetInt8FromPayload() ([]int8, error)
|
||||
@ -79,334 +63,3 @@ type PayloadReaderInterface interface {
|
||||
ReleasePayloadReader() error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// PayloadWriter writes data into payload
|
||||
type PayloadWriter struct {
|
||||
payloadWriterPtr C.CPayloadWriter
|
||||
colType schemapb.DataType
|
||||
}
|
||||
|
||||
// NewPayloadWriter is constructor of PayloadWriter
|
||||
func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) {
|
||||
return NewPurePayloadWriter(colType, dim...)
|
||||
}
|
||||
|
||||
// AddDataToPayload adds @msgs into payload, if @msgs is vector, dimension should be specified by @dim
|
||||
func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
|
||||
switch len(dim) {
|
||||
case 0:
|
||||
switch w.colType {
|
||||
case schemapb.DataType_Bool:
|
||||
val, ok := msgs.([]bool)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddBoolToPayload(val)
|
||||
case schemapb.DataType_Int8:
|
||||
val, ok := msgs.([]int8)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddInt8ToPayload(val)
|
||||
case schemapb.DataType_Int16:
|
||||
val, ok := msgs.([]int16)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddInt16ToPayload(val)
|
||||
case schemapb.DataType_Int32:
|
||||
val, ok := msgs.([]int32)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddInt32ToPayload(val)
|
||||
case schemapb.DataType_Int64:
|
||||
val, ok := msgs.([]int64)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddInt64ToPayload(val)
|
||||
case schemapb.DataType_Float:
|
||||
val, ok := msgs.([]float32)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddFloatToPayload(val)
|
||||
case schemapb.DataType_Double:
|
||||
val, ok := msgs.([]float64)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddDoubleToPayload(val)
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
val, ok := msgs.(string)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddOneStringToPayload(val)
|
||||
case schemapb.DataType_Array:
|
||||
val, ok := msgs.(*schemapb.ScalarField)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddOneArrayToPayload(val)
|
||||
case schemapb.DataType_JSON:
|
||||
val, ok := msgs.([]byte)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddOneJSONToPayload(val)
|
||||
default:
|
||||
return errors.New("incorrect datatype")
|
||||
}
|
||||
case 1:
|
||||
switch w.colType {
|
||||
case schemapb.DataType_BinaryVector:
|
||||
val, ok := msgs.([]byte)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddBinaryVectorToPayload(val, dim[0])
|
||||
case schemapb.DataType_FloatVector:
|
||||
val, ok := msgs.([]float32)
|
||||
if !ok {
|
||||
return errors.New("incorrect data type")
|
||||
}
|
||||
return w.AddFloatVectorToPayload(val, dim[0])
|
||||
default:
|
||||
return errors.New("incorrect datatype")
|
||||
}
|
||||
default:
|
||||
return errors.New("incorrect input numbers")
|
||||
}
|
||||
}
|
||||
|
||||
// AddBoolToPayload adds @msgs into payload
|
||||
func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddBoolToPayload failed")
|
||||
}
|
||||
|
||||
// AddByteToPayload adds @msgs into payload
|
||||
func (w *PayloadWriter) AddByteToPayload(msgs []byte) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddInt8ToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddInt8ToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddInt16ToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddInt32ToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddInt64ToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddFloatToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
|
||||
length := len(msgs)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty msgs into payload")
|
||||
}
|
||||
|
||||
cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
|
||||
cLength := C.int(length)
|
||||
|
||||
status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
|
||||
return HandleCStatus(&status, "AddDoubleToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
|
||||
length := len(msg)
|
||||
cmsg := C.CString(msg)
|
||||
clength := C.int(length)
|
||||
defer C.free(unsafe.Pointer(cmsg))
|
||||
|
||||
// the C.AddOneStringToPayload can handle empty string
|
||||
status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
|
||||
return HandleCStatus(&status, "AddOneStringToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddOneArrayToPayload(msg *schemapb.ScalarField) error {
|
||||
bytes, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return errors.New("Marshal ListValue failed")
|
||||
}
|
||||
|
||||
length := len(bytes)
|
||||
cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0]))
|
||||
clength := C.int(length)
|
||||
// defer C.free(unsafe.Pointer(cmsg))
|
||||
|
||||
status := C.AddOneArrayToPayload(w.payloadWriterPtr, cmsg, clength)
|
||||
return HandleCStatus(&status, "AddOneArrayToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) AddOneJSONToPayload(msg []byte) error {
|
||||
bytes := msg
|
||||
length := len(bytes)
|
||||
cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0]))
|
||||
clength := C.int(length)
|
||||
|
||||
status := C.AddOneJSONToPayload(w.payloadWriterPtr, cmsg, clength)
|
||||
return HandleCStatus(&status, "AddOneJSONToPayload failed")
|
||||
}
|
||||
|
||||
// AddBinaryVectorToPayload dimension > 0 && (%8 == 0)
|
||||
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
|
||||
length := len(binVec)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty binVec into payload")
|
||||
}
|
||||
if dim <= 0 {
|
||||
return errors.New("dimension should be greater than 0")
|
||||
}
|
||||
|
||||
cBinVec := (*C.uint8_t)(&binVec[0])
|
||||
cDim := C.int(dim)
|
||||
cLength := C.int(length / (dim / 8))
|
||||
|
||||
status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
|
||||
return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
|
||||
}
|
||||
|
||||
// AddFloatVectorToPayload dimension > 0 && (%8 == 0)
|
||||
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
|
||||
length := len(floatVec)
|
||||
if length <= 0 {
|
||||
return errors.New("can't add empty floatVec into payload")
|
||||
}
|
||||
if dim <= 0 {
|
||||
return errors.New("dimension should be greater than 0")
|
||||
}
|
||||
|
||||
cVec := (*C.float)(&floatVec[0])
|
||||
cDim := C.int(dim)
|
||||
cLength := C.int(length / dim)
|
||||
|
||||
status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
|
||||
return HandleCStatus(&status, "AddFloatVectorToPayload failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) FinishPayloadWriter() error {
|
||||
status := C.FinishPayloadWriter(w.payloadWriterPtr)
|
||||
return HandleCStatus(&status, "FinishPayloadWriter failed")
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
|
||||
cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
|
||||
pointer := uintptr(unsafe.Pointer(cb.data))
|
||||
length := int(cb.length)
|
||||
if length <= 0 {
|
||||
return nil, errors.New("empty buffer")
|
||||
}
|
||||
|
||||
var data []byte
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
|
||||
sh.Data = pointer
|
||||
sh.Len = length
|
||||
sh.Cap = length
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
|
||||
length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
|
||||
return int(length), nil
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) ReleasePayloadWriter() {
|
||||
C.ReleasePayloadWriter(w.payloadWriterPtr)
|
||||
}
|
||||
|
||||
func (w *PayloadWriter) Close() {
|
||||
w.ReleasePayloadWriter()
|
||||
}
|
||||
|
||||
// HandleCStatus deal with the error returned from CGO
|
||||
func HandleCStatus(status *C.CStatus, extraInfo string) error {
|
||||
if status.error_code == 0 {
|
||||
return nil
|
||||
}
|
||||
errorCode := status.error_code
|
||||
errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
|
||||
if !ok {
|
||||
errorName = "UnknownError"
|
||||
}
|
||||
errorMsg := C.GoString(status.error_msg)
|
||||
defer C.free(unsafe.Pointer(status.error_msg))
|
||||
|
||||
finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
|
||||
logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
|
||||
log.Warn(logMsg)
|
||||
return errors.New(finalMsg)
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ type NativePayloadWriter struct {
|
||||
releaseOnce sync.Once
|
||||
}
|
||||
|
||||
func NewPurePayloadWriter(colType schemapb.DataType, dim ...int) (*NativePayloadWriter, error) {
|
||||
func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) {
|
||||
var arrowType arrow.DataType
|
||||
if typeutil.IsVectorType(colType) {
|
||||
if len(dim) != 1 {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user