diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index 282366fff9..707bdade50 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -291,7 +291,7 @@ func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) { for _, str := range data.([]string) { rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str)) if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s'", str)) + return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s', err = %v", str, err)) } byteArr = append(byteArr, rowVec) elemCount := len(rowVec) / 8 diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 8277ccbe43..272447981a 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1550,12 +1550,31 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) { return nil, fmt.Errorf("empty JSON input") } - // try format1 - indices, ok1 := input["indices"].([]uint32) - values, ok2 := input["values"].([]float32) + jsonIndices, ok1 := input["indices"].([]interface{}) + jsonValues, ok2 := input["values"].([]interface{}) - // try format2 - if !ok1 && !ok2 { + if ok1 && ok2 { + // try format1 + for _, v1 := range jsonIndices { + if num1, suc1 := v1.(int); suc1 { + indices = append(indices, uint32(num1)) + } else { + if num2, suc2 := v1.(float64); suc2 && num2 == float64(int(num2)) { + indices = append(indices, uint32(num2)) + } else { + return nil, fmt.Errorf("invalid index type: %v(%s)", v1, reflect.TypeOf(v1)) + } + } + } + for _, v2 := range jsonValues { + if num, ok := v2.(float64); ok { + values = append(values, float32(num)) + } else { + return nil, fmt.Errorf("invalid value type: %s", reflect.TypeOf(v2)) + } + } + } else if !ok1 && !ok2 { + // try format2 for k, v := range input { idx, err := strconv.ParseUint(k, 0, 32) if err != nil { @@ -1578,7 +1597,7 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) { indices = append(indices, uint32(idx)) values = append(values, float32(val)) } - } else if ok1 != ok2 { + } else { return nil, fmt.Errorf("invalid JSON input") } diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 67601a719d..b1e5ec4b83 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -2120,39 +2120,45 @@ func TestValidateSparseFloatRows(t *testing.T) { func TestParseJsonSparseFloatRow(t *testing.T) { t.Run("valid row 1", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{1, 3, 5}, "values": []float32{1.0, 2.0, 3.0}} + row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0, 3.0}} res, err := CreateSparseFloatRowFromMap(row) assert.NoError(t, err) assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) }) t.Run("valid row 2", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{3, 1, 5}, "values": []float32{1.0, 2.0, 3.0}} + row := map[string]interface{}{"indices": []interface{}{3, 1, 5}, "values": []interface{}{1.0, 2.0, 3.0}} res, err := CreateSparseFloatRowFromMap(row) assert.NoError(t, err) assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) }) t.Run("invalid row 1", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{1, 3, 5}, "values": []float32{1.0, 2.0}} + row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}} _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) t.Run("invalid row 2", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{1}, "values": []float32{1.0, 2.0}} + row := map[string]interface{}{"indices": []interface{}{1}, "values": []interface{}{1.0, 2.0}} _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) t.Run("invalid row 3", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{}, "values": []float32{}} + row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}} _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) t.Run("invalid row 4", func(t *testing.T) { - row := map[string]interface{}{"indices": []uint32{3}, "values": []float32{-0.2}} + row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 5", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{3.1}, "values": []interface{}{0.2}} _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) @@ -2206,4 +2212,130 @@ func TestParseJsonSparseFloatRow(t *testing.T) { _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) + + t.Run("invalid dict row 7", func(t *testing.T) { + row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) +} + +func TestParseJsonSparseFloatRowBytes(t *testing.T) { + t.Run("valid row 1", func(t *testing.T) { + row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0]}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid row 2", func(t *testing.T) { + row := []byte(`{"indices":[3,1,5],"values":[1.0,2.0,3.0]}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) + }) + + t.Run("invalid row 1", func(t *testing.T) { + row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 2", func(t *testing.T) { + row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0]`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 3", func(t *testing.T) { + row := []byte(`{"indices":[1],"values":[1.0,2.0]`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 4", func(t *testing.T) { + row := []byte(`{"indices":[],"values":[]`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 5", func(t *testing.T) { + row := []byte(`{"indices":[-3],"values":[0.2]`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 6", func(t *testing.T) { + row := []byte(`{"indices":[3],"values":[-0.2]`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid row 7", func(t *testing.T) { + row := []byte(`{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("valid dict row 1", func(t *testing.T) { + row := []byte(`{"1": 1.0, "3": 2.0, "5": 3.0}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid dict row 2", func(t *testing.T) { + row := []byte(`{"3": 1.0, "1": 2.0, "5": 3.0}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) + }) + + t.Run("invalid dict row 1", func(t *testing.T) { + row := []byte(`{"a": 1.0, "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 2", func(t *testing.T) { + row := []byte(`{"1": "a", "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 3", func(t *testing.T) { + row := []byte(`{"1": "1.0", "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 4", func(t *testing.T) { + row := []byte(`{"1": 1.0, "3": 2.0, "5": }`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 5", func(t *testing.T) { + row := []byte(`{"-1": 1.0, "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 6", func(t *testing.T) { + row := []byte(`{"1": -1.0, "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 7", func(t *testing.T) { + row := []byte(`{}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 8", func(t *testing.T) { + row := []byte(`{"1.1": 1.0, "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) }