diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 6938602f2a..6cf8e3e3e8 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -93,6 +93,7 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec return } + log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema)) collection := NewCollection(collectionID, schema, meta, loadMeta) collection.Ref(1) m.collections[collectionID] = collection diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index a74bfe820b..819b20b1b4 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -181,11 +181,11 @@ func (w *NativePayloadWriter) AddDataToPayload(data interface{}, dim ...int) err func (w *NativePayloadWriter) AddBoolToPayload(data []bool) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished bool payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into bool payload") } builder, ok := w.builder.(*array.BooleanBuilder) @@ -199,16 +199,16 @@ func (w *NativePayloadWriter) AddBoolToPayload(data []bool) error { func (w *NativePayloadWriter) AddByteToPayload(data []byte) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished byte payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into byte payload") } builder, ok := w.builder.(*array.Int8Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast ByteBuilder") } builder.Reserve(len(data)) @@ -221,16 +221,16 @@ func (w *NativePayloadWriter) AddByteToPayload(data []byte) error { func (w *NativePayloadWriter) AddInt8ToPayload(data []int8) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished int8 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into int8 payload") } builder, ok := w.builder.(*array.Int8Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast Int8Builder") } builder.AppendValues(data, nil) @@ -239,16 +239,16 @@ func (w *NativePayloadWriter) AddInt8ToPayload(data []int8) error { func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished int16 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into int64 payload") } builder, ok := w.builder.(*array.Int16Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast Int16Builder") } builder.AppendValues(data, nil) @@ -257,16 +257,16 @@ func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error { func (w *NativePayloadWriter) AddInt32ToPayload(data []int32) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished int32 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into int32 payload") } builder, ok := w.builder.(*array.Int32Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast Int32Builder") } builder.AppendValues(data, nil) @@ -275,16 +275,16 @@ func (w *NativePayloadWriter) AddInt32ToPayload(data []int32) error { func (w *NativePayloadWriter) AddInt64ToPayload(data []int64) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished int64 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into int64 payload") } builder, ok := w.builder.(*array.Int64Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast Int64Builder") } builder.AppendValues(data, nil) @@ -293,16 +293,16 @@ func (w *NativePayloadWriter) AddInt64ToPayload(data []int64) error { func (w *NativePayloadWriter) AddFloatToPayload(data []float32) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished float payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into float payload") } builder, ok := w.builder.(*array.Float32Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast FloatBuilder") } builder.AppendValues(data, nil) @@ -311,16 +311,16 @@ func (w *NativePayloadWriter) AddFloatToPayload(data []float32) error { func (w *NativePayloadWriter) AddDoubleToPayload(data []float64) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished double payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into double payload") } builder, ok := w.builder.(*array.Float64Builder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast DoubleBuilder") } builder.AppendValues(data, nil) @@ -329,12 +329,12 @@ func (w *NativePayloadWriter) AddDoubleToPayload(data []float64) error { func (w *NativePayloadWriter) AddOneStringToPayload(data string) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished string payload") } builder, ok := w.builder.(*array.StringBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast StringBuilder") } builder.Append(data) @@ -344,7 +344,7 @@ func (w *NativePayloadWriter) AddOneStringToPayload(data string) error { func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished array payload") } bytes, err := proto.Marshal(data) @@ -354,7 +354,7 @@ func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) e builder, ok := w.builder.(*array.BinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast BinaryBuilder") } builder.Append(bytes) @@ -364,12 +364,12 @@ func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) e func (w *NativePayloadWriter) AddOneJSONToPayload(data []byte) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished json payload") } builder, ok := w.builder.(*array.BinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast JsonBuilder") } builder.Append(data) @@ -379,16 +379,16 @@ func (w *NativePayloadWriter) AddOneJSONToPayload(data []byte) error { func (w *NativePayloadWriter) AddBinaryVectorToPayload(data []byte, dim int) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished binary vector payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into binary vector payload") } builder, ok := w.builder.(*array.FixedSizeBinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast BinaryVectorBuilder") } byteLength := dim / 8 @@ -403,16 +403,16 @@ func (w *NativePayloadWriter) AddBinaryVectorToPayload(data []byte, dim int) err func (w *NativePayloadWriter) AddFloatVectorToPayload(data []float32, dim int) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished float vector payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into float vector payload") } builder, ok := w.builder.(*array.FixedSizeBinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast FloatVectorBuilder") } byteLength := dim * 4 @@ -434,16 +434,16 @@ func (w *NativePayloadWriter) AddFloatVectorToPayload(data []float32, dim int) e func (w *NativePayloadWriter) AddFloat16VectorToPayload(data []byte, dim int) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished float16 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into float16 payload") } builder, ok := w.builder.(*array.FixedSizeBinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast Float16Builder") } byteLength := dim * 2 @@ -459,16 +459,16 @@ func (w *NativePayloadWriter) AddFloat16VectorToPayload(data []byte, dim int) er func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished BFloat16 payload") } if len(data) == 0 { - return errors.New("can't add empty msgs into payload") + return errors.New("can't add empty msgs into BFloat16 payload") } builder, ok := w.builder.(*array.FixedSizeBinaryBuilder) if !ok { - return errors.New("failed to cast ArrayBuilder") + return errors.New("failed to cast BFloat16Builder") } byteLength := dim * 2 @@ -484,11 +484,11 @@ func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) e func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error { if w.finished { - return errors.New("can't append data to finished writer") + return errors.New("can't append data to finished sparse float vector payload") } builder, ok := w.builder.(*array.BinaryBuilder) if !ok { - return errors.New("failed to cast BinaryBuilder") + return errors.New("failed to cast SparseFloatVectorBuilder") } length := len(data.SparseFloatArray.Contents) builder.Reserve(length) diff --git a/internal/storage/payload_writer_test.go b/internal/storage/payload_writer_test.go new file mode 100644 index 0000000000..e18c6b2637 --- /dev/null +++ b/internal/storage/payload_writer_test.go @@ -0,0 +1,295 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +func TestPayloadWriter_Failed(t *testing.T) { + t.Run("Test Bool", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddBoolToPayload([]bool{false}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false}) + require.Error(t, err) + }) + + t.Run("Test Byte", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int8) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddByteToPayload([]byte{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddByteToPayload([]byte{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddByteToPayload([]byte{0}) + require.Error(t, err) + }) + + t.Run("Test Int8", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int8) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt8ToPayload([]int8{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddInt8ToPayload([]int8{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt8ToPayload([]int8{0}) + require.Error(t, err) + }) + + t.Run("Test Int16", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int16) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt16ToPayload([]int16{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddInt16ToPayload([]int16{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt16ToPayload([]int16{0}) + require.Error(t, err) + }) + + t.Run("Test Int32", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int32) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt32ToPayload([]int32{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddInt32ToPayload([]int32{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt32ToPayload([]int32{0}) + require.Error(t, err) + }) + + t.Run("Test Int64", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt64ToPayload([]int64{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddInt64ToPayload([]int64{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt64ToPayload([]int64{0}) + require.Error(t, err) + }) + + t.Run("Test Float", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddFloatToPayload([]float32{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddFloatToPayload([]float32{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddFloatToPayload([]float32{0}) + require.Error(t, err) + }) + + t.Run("Test Double", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Double) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddDoubleToPayload([]float64{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddDoubleToPayload([]float64{0}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddDoubleToPayload([]float64{0}) + require.Error(t, err) + }) + + t.Run("Test String", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_String) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddOneStringToPayload("test") + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddOneStringToPayload("test") + require.Error(t, err) + }) + + t.Run("Test Array", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Array) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddOneArrayToPayload(&schemapb.ScalarField{}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddOneArrayToPayload(&schemapb.ScalarField{}) + require.Error(t, err) + }) + + t.Run("Test Json", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_JSON) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddOneJSONToPayload([]byte{0, 1}) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddOneJSONToPayload([]byte{0, 1}) + require.Error(t, err) + }) + + t.Run("Test BinaryVector", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_BinaryVector, 8) + require.Nil(t, err) + require.NotNil(t, w) + + data := make([]byte, 8) + for i := 0; i < 8; i++ { + data[i] = 1 + } + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddBinaryVectorToPayload(data, 8) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBinaryVectorToPayload(data, 8) + require.Error(t, err) + }) + + t.Run("Test FloatVector", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_FloatVector, 8) + require.Nil(t, err) + require.NotNil(t, w) + + data := make([]float32, 8) + for i := 0; i < 8; i++ { + data[i] = 1 + } + + err = w.AddFloatToPayload([]float32{}) + require.Error(t, err) + + err = w.FinishPayloadWriter() + require.NoError(t, err) + + err = w.AddFloatToPayload(data) + require.Error(t, err) + + w, err = NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddFloatToPayload(data) + require.Error(t, err) + }) +}