diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp index 1d05b8ddcf..95da2558c4 100644 --- a/internal/core/src/common/ChunkWriter.cpp +++ b/internal/core/src/common/ChunkWriter.cpp @@ -385,7 +385,8 @@ create_chunk(const FieldMeta& field_meta, break; } case milvus::DataType::VARCHAR: - case milvus::DataType::STRING: { + case milvus::DataType::STRING: + case milvus::DataType::TEXT: { w = std::make_shared(nullable); break; } @@ -486,7 +487,8 @@ create_chunk(const FieldMeta& field_meta, break; } case milvus::DataType::VARCHAR: - case milvus::DataType::STRING: { + case milvus::DataType::STRING: + case milvus::DataType::TEXT: { w = std::make_shared( file, file_offset, nullable); break; diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index 470f9e9420..cd5086a430 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -179,7 +179,8 @@ FieldDataImpl::FillFieldData( return FillFieldData(array_info.first, array_info.second); } case DataType::STRING: - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { AssertInfo(array->type()->id() == arrow::Type::type::STRING, "inconsistent data type"); auto string_array = @@ -311,6 +312,7 @@ InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows) { type, nullable, cap_rows); case DataType::STRING: case DataType::VARCHAR: + case DataType::TEXT: return std::make_shared>( type, nullable, cap_rows); case DataType::JSON: diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 52b7a33258..e0f1757035 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -76,6 +76,8 @@ enum class DataType { VARCHAR = 21, ARRAY = 22, JSON = 23, + // GEOMETRY = 24 // reserved in proto + TEXT = 25, // Some special Data type, start from after 50 // just for internal use now, may sync proto in future @@ -182,6 +184,8 @@ GetDataTypeName(DataType data_type) { return "array"; case DataType::JSON: return "json"; + case DataType::TEXT: + return "text"; case DataType::VECTOR_FLOAT: return "vector_float"; case DataType::VECTOR_BINARY: @@ -255,6 +259,7 @@ IsStringDataType(DataType data_type) { switch (data_type) { case DataType::VARCHAR: case DataType::STRING: + case DataType::TEXT: return true; default: return false; @@ -538,6 +543,12 @@ struct TypeTraits : public TypeTraits { static constexpr const char* Name = "STRING"; }; +template <> +struct TypeTraits : public TypeTraits { + static constexpr DataType TypeKind = DataType::TEXT; + static constexpr const char* Name = "TEXT"; +}; + template <> struct TypeTraits { using NativeType = void; @@ -620,6 +631,9 @@ struct fmt::formatter : formatter { case milvus::DataType::VARCHAR: name = "VARCHAR"; break; + case milvus::DataType::TEXT: + name = "TEXT"; + break; case milvus::DataType::ARRAY: name = "ARRAY"; break; diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index b11e105352..39f40375e9 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -51,6 +51,7 @@ PaddingSize(const DataType& type) { return simdjson::SIMDJSON_PADDING; case DataType::VARCHAR: case DataType::STRING: + case DataType::TEXT: return FILE_STRING_PADDING; break; case DataType::ARRAY: @@ -92,7 +93,8 @@ WriteFieldData(File& file, BufferedWriter bw = BufferedWriter(file, 1048576); switch (data_type) { case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { // write as: |size|data|size|data...... for (auto i = 0; i < data->get_num_rows(); ++i) { indices.push_back(total_written); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 39efe6c9d0..71a9bc0249 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -378,7 +378,8 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { int64_t field_data_size = 0; switch (data_type) { case milvus::DataType::STRING: - case milvus::DataType::VARCHAR: { + case milvus::DataType::VARCHAR: + case milvus::DataType::TEXT: { auto var_column = std::make_shared>( field_meta); @@ -580,7 +581,8 @@ ChunkedSegmentSealedImpl::MapFieldData(const FieldId field_id, if (IsVariableDataType(data_type)) { switch (data_type) { case milvus::DataType::STRING: - case milvus::DataType::VARCHAR: { + case milvus::DataType::VARCHAR: + case milvus::DataType::TEXT: { // auto var_column = std::make_shared>( // file, // total_written, @@ -1593,7 +1595,8 @@ ChunkedSegmentSealedImpl::get_raw_data(FieldId field_id, } switch (field_meta.get_data_type()) { case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { bulk_subscript_ptr_impl( column.get(), seg_offsets, diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index ba6154f065..dd8318d2c1 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -85,7 +85,8 @@ VectorBase::set_data_raw(ssize_t element_offset, return set_data_raw( element_offset, FIELD_DATA(data, double).data(), element_count); } - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { auto& field_data = FIELD_DATA(data, string); std::vector data_raw(field_data.begin(), field_data.end()); diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index cdea9a60e0..72c9a0b058 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -452,7 +452,8 @@ struct InsertRecord { this->append_data(field_id, size_per_chunk); break; } - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { this->append_data(field_id, size_per_chunk); break; } diff --git a/internal/core/src/segcore/SegmentChunkReader.cpp b/internal/core/src/segcore/SegmentChunkReader.cpp index 207248acf8..52bbe2bb1c 100644 --- a/internal/core/src/segcore/SegmentChunkReader.cpp +++ b/internal/core/src/segcore/SegmentChunkReader.cpp @@ -288,7 +288,8 @@ SegmentChunkReader::GetChunkDataAccessor(DataType data_type, case DataType::DOUBLE: return GetChunkDataAccessor( field_id, chunk_id, data_barrier); - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { return GetChunkDataAccessor( field_id, chunk_id, data_barrier); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 5b41531d9b..bd6b27d4f2 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -579,7 +579,8 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, ->mutable_data()); break; } - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { bulk_subscript_ptr_impl(vec_ptr, seg_offsets, count, diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 5874c85df8..d2552d089d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -404,7 +404,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { int64_t field_data_size = 0; switch (data_type) { case milvus::DataType::STRING: - case milvus::DataType::VARCHAR: { + case milvus::DataType::VARCHAR: + case milvus::DataType::TEXT: { auto var_column = std::make_shared< SingleChunkVariableColumn>( num_rows, field_meta, get_block_size()); @@ -580,7 +581,8 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { if (IsVariableDataType(data_type)) { switch (data_type) { case milvus::DataType::STRING: - case milvus::DataType::VARCHAR: { + case milvus::DataType::VARCHAR: + case milvus::DataType::TEXT: { auto var_column = std::make_shared>( file, @@ -1412,7 +1414,8 @@ SegmentSealedImpl::get_raw_data(FieldId field_id, } switch (field_meta.get_data_type()) { case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { bulk_subscript_ptr_impl( column.get(), seg_offsets, diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index ba55ca8706..88d2b0fc81 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -132,7 +132,8 @@ GetRawDataSizeOfDataArray(const DataArray* data, } else { switch (data_type) { case DataType::STRING: - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { auto& string_data = FIELD_DATA(data, string); for (auto& str : string_data) { result += str.size(); @@ -187,7 +188,8 @@ GetRawDataSizeOfDataArray(const DataArray* data, break; } case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { for (auto& array_bytes : array_data) { auto element_num = array_bytes.string_data().data_size(); @@ -276,7 +278,8 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { break; } case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { auto obj = scalar_array->mutable_string_data(); obj->mutable_data()->Reserve(count); for (auto i = 0; i < count; i++) { @@ -430,7 +433,8 @@ CreateScalarDataArrayFrom(const void* data_raw, obj->mutable_data()->Add(data, data + count); break; } - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_string_data(); for (auto i = 0; i < count; i++) { @@ -660,7 +664,8 @@ MergeDataArray(std::vector& merge_bases, *(obj->mutable_data()->Add()) = data[src_offset]; break; } - case DataType::VARCHAR: { + case DataType::VARCHAR: + case DataType::TEXT: { auto& data = FIELD_DATA(src_field_data, string); auto obj = scalar_array->mutable_string_data(); *(obj->mutable_data()->Add()) = data[src_offset]; diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index b35c92c22c..d1ee08ee3e 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -242,7 +242,8 @@ BaseEventData::Serialize() { } switch (data_type) { case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { for (size_t offset = 0; offset < field_data->get_num_rows(); ++offset) { auto str = static_cast( diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 5c3419f3b2..4d1953342e 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -272,7 +272,8 @@ CreateArrowBuilder(DataType data_type) { return std::make_shared(); } case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { return std::make_shared(); } case DataType::ARRAY: @@ -357,7 +358,8 @@ CreateArrowSchema(DataType data_type, bool nullable) { {arrow::field("val", arrow::float64(), nullable)}); } case DataType::VARCHAR: - case DataType::STRING: { + case DataType::STRING: + case DataType::TEXT: { return arrow::schema( {arrow::field("val", arrow::utf8(), nullable)}); } @@ -812,6 +814,7 @@ CreateFieldData(const DataType& type, type, nullable, total_num_rows); case DataType::STRING: case DataType::VARCHAR: + case DataType::TEXT: return std::make_shared>( type, nullable, total_num_rows); case DataType::JSON: diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node.go b/internal/flushcommon/pipeline/flow_graph_embedding_node.go index f264b8b39a..87e9701e53 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node.go @@ -86,7 +86,7 @@ func (eNode *embeddingNode) bm25Embedding(runner function.FunctionRunner, inputF embeddingData, ok := data.Data[inputFieldId].GetDataRows().([]string) if !ok { - return fmt.Errorf("BM25 embedding failed: input field data not varchar") + return fmt.Errorf("BM25 embedding failed: input field data not varchar/text") } output, err := runner.BatchRun(embeddingData) diff --git a/internal/parser/planparserv2/parser_visitor.go b/internal/parser/planparserv2/parser_visitor.go index 24ac599c47..ad5a94d5e6 100644 --- a/internal/parser/planparserv2/parser_visitor.go +++ b/internal/parser/planparserv2/parser_visitor.go @@ -39,6 +39,10 @@ func (v *ParserVisitor) translateIdentifier(identifier string) (*ExprWithType, e nestedPath = append(nestedPath, identifier) } + if field.DataType == schemapb.DataType_Text { + return nil, fmt.Errorf("filter on text field (%s) is not supported yet", field.Name) + } + return &ExprWithType{ expr: &planpb.Expr{ Expr: &planpb.Expr_ColumnExpr{ @@ -494,6 +498,9 @@ func (v *ParserVisitor) VisitTextMatch(ctx *parser.TextMatchContext) interface{} if !typeutil.IsStringType(column.dataType) { return fmt.Errorf("text match operation on non-string is unsupported") } + if column.dataType == schemapb.DataType_Text { + return fmt.Errorf("text match operation on text field is not supported yet") + } queryText, err := convertEscapeSingle(ctx.StringLiteral().GetText()) if err != nil { diff --git a/internal/parser/planparserv2/plan_parser_v2_test.go b/internal/parser/planparserv2/plan_parser_v2_test.go index 3e633b0e82..36d0df46ad 100644 --- a/internal/parser/planparserv2/plan_parser_v2_test.go +++ b/internal/parser/planparserv2/plan_parser_v2_test.go @@ -294,6 +294,21 @@ func TestExpr_PhraseMatch(t *testing.T) { } } +func TestExpr_TextField(t *testing.T) { + schema := newTestSchema(true) + helper, err := typeutil.CreateSchemaHelper(schema) + assert.NoError(t, err) + + invalidExprs := []string{ + `TextField == "query"`, + `text_match(TextField, "query")`, + } + + for _, exprStr := range invalidExprs { + assertInvalidExpr(t, helper, exprStr) + } +} + func TestExpr_IsNull(t *testing.T) { schema := newTestSchema(false) schema.EnableDynamicField = false diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 4b643f234d..a6bb600ada 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -397,6 +397,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error { // valid max length per row parameters // if max_length not specified, return error if field.DataType == schemapb.DataType_VarChar || + field.DataType == schemapb.DataType_Text || (field.GetDataType() == schemapb.DataType_Array && field.GetElementType() == schemapb.DataType_VarChar) { err = validateMaxLengthPerRow(t.schema.Name, field) if err != nil { diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 8aa847d616..cf400b7d06 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -216,10 +216,10 @@ func (it *insertTask) PreExecute(ctx context.Context) error { return err } - // check varchar with analyzer was utf-8 format - err = checkVarcharFormat(it.schema, it.insertMsg) + // check varchar/text with analyzer was utf-8 format + err = checkInputUtf8Compatiable(it.schema, it.insertMsg) if err != nil { - log.Warn("check varchar format failed", zap.Error(err)) + log.Warn("check varchar/text format failed", zap.Error(err)) return err } diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index d7f4568ae6..a23a35a1a3 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -210,10 +210,10 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid) } - // check varchar with analyzer was utf-8 format - err = checkVarcharFormat(it.schema.CollectionSchema, it.upsertMsg.InsertMsg) + // check varchar/text with analyzer was utf-8 format + err = checkInputUtf8Compatiable(it.schema.CollectionSchema, it.upsertMsg.InsertMsg) if err != nil { - log.Warn("check varchar format failed", zap.Error(err)) + log.Warn("check varchar/text format failed", zap.Error(err)) return err } diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 12e0b4e871..5f0a6e6c90 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -365,15 +365,21 @@ func validateMaxLengthPerRow(collectionName string, field *schemapb.FieldSchema) return err } - defaultMaxVarCharLength := Params.ProxyCfg.MaxVarCharLength.GetAsInt64() - if maxLengthPerRow > defaultMaxVarCharLength || maxLengthPerRow <= 0 { - return merr.WrapErrParameterInvalidMsg("the maximum length specified for a VarChar field(%s) should be in (0, %d], but got %d instead", field.GetName(), defaultMaxVarCharLength, maxLengthPerRow) + var defaultMaxLength int64 + if field.DataType == schemapb.DataType_Text { + defaultMaxLength = Params.ProxyCfg.MaxTextLength.GetAsInt64() + } else { + defaultMaxLength = Params.ProxyCfg.MaxVarCharLength.GetAsInt64() + } + + if maxLengthPerRow > defaultMaxLength || maxLengthPerRow <= 0 { + return merr.WrapErrParameterInvalidMsg("the maximum length specified for the field(%s) should be in (0, %d], but got %d instead", field.GetName(), defaultMaxLength, maxLengthPerRow) } exist = true } // if not exist type params max_length, return error if !exist { - return fmt.Errorf("type param(max_length) should be specified for varChar field(%s) of collection %s", field.GetName(), collectionName) + return fmt.Errorf("type param(max_length) should be specified for the field(%s) of collection %s", field.GetName(), collectionName) } return nil @@ -746,8 +752,8 @@ func wasBm25FunctionInputField(coll *schemapb.CollectionSchema, field *schemapb. func checkFunctionInputField(function *schemapb.FunctionSchema, fields []*schemapb.FieldSchema) error { switch function.GetType() { case schemapb.FunctionType_BM25: - if len(fields) != 1 || fields[0].DataType != schemapb.DataType_VarChar { - return fmt.Errorf("BM25 function input field must be a VARCHAR field, got %d field with type %s", + if len(fields) != 1 || (fields[0].DataType != schemapb.DataType_VarChar && fields[0].DataType != schemapb.DataType_Text) { + return fmt.Errorf("BM25 function input field must be a VARCHAR/TEXT field, got %d field with type %s", len(fields), fields[0].DataType.String()) } h := typeutil.CreateFieldSchemaHelper(fields[0]) @@ -755,8 +761,8 @@ func checkFunctionInputField(function *schemapb.FunctionSchema, fields []*schema return fmt.Errorf("BM25 function input field must set enable_analyzer to true") } case schemapb.FunctionType_TextEmbedding: - if len(fields) != 1 || fields[0].DataType != schemapb.DataType_VarChar { - return fmt.Errorf("TextEmbedding function input field must be a VARCHAR field") + if len(fields) != 1 || (fields[0].DataType != schemapb.DataType_VarChar && fields[0].DataType != schemapb.DataType_Text) { + return fmt.Errorf("TextEmbedding function input field must be a VARCHAR/TEXT field") } default: return fmt.Errorf("check input field with unknown function type") @@ -1613,9 +1619,9 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre // for some varchar with analzyer // we need check char format before insert it to message queue // now only support utf-8 -func checkVarcharFormat(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { +func checkInputUtf8Compatiable(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { checkeFields := lo.FilterMap(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { - if field.DataType != schemapb.DataType_VarChar { + if field.DataType != schemapb.DataType_VarChar && field.DataType != schemapb.DataType_Text { return 0, false } @@ -1639,7 +1645,7 @@ func checkVarcharFormat(schema *schemapb.CollectionSchema, insertMsg *msgstream. for row, data := range fieldData.GetScalars().GetStringData().GetData() { ok := utf8.ValidString(data) if !ok { - return merr.WrapErrAsInputError(fmt.Errorf("varchar with analyzer should be utf-8 format, but row: %d not utf-8 varchar. data: %s", row, data)) + return merr.WrapErrAsInputError(fmt.Errorf("input with analyzer should be utf-8 format, but row: %d not utf-8 format. data: %s", row, data)) } } } diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 0662953db7..edb7f2d586 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -3425,7 +3425,7 @@ func TestCheckVarcharFormat(t *testing.T) { }, } - err := checkVarcharFormat(schema, data) + err := checkInputUtf8Compatiable(schema, data) assert.NoError(t, err) // invalid data @@ -3447,7 +3447,7 @@ func TestCheckVarcharFormat(t *testing.T) { }}, }, } - err = checkVarcharFormat(schema, data) + err = checkInputUtf8Compatiable(schema, data) assert.Error(t, err) } @@ -3490,6 +3490,6 @@ func BenchmarkCheckVarcharFormat(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - checkVarcharFormat(schema, data) + checkInputUtf8Compatiable(schema, data) } } diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index b555d7c82c..45c2763c21 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -96,6 +96,10 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, helper *typeutil.Sch if err := v.checkVarCharFieldData(field, fieldSchema); err != nil { return err } + case schemapb.DataType_Text: + if err := v.checkTextFieldData(field, fieldSchema); err != nil { + return err + } case schemapb.DataType_JSON: if err := v.checkJSONFieldData(field, fieldSchema); err != nil { return err @@ -672,6 +676,29 @@ func (v *validateUtil) checkVarCharFieldData(field *schemapb.FieldData, fieldSch return nil } +func (v *validateUtil) checkTextFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { + strArr := field.GetScalars().GetStringData().GetData() + if strArr == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() { + msg := fmt.Sprintf("text field '%v' is illegal", field.GetFieldName()) + return merr.WrapErrParameterInvalid("need text array", msg) + } + + if v.checkMaxLen { + maxLength, err := parameterutil.GetMaxLength(fieldSchema) + if err != nil { + return err + } + + if i, ok := verifyLengthPerRow(strArr, maxLength); !ok { + return merr.WrapErrParameterInvalidMsg("length of text field %s exceeds max length, row number: %d, length: %d, max length: %d", + fieldSchema.GetName(), i, len(strArr[i]), maxLength) + } + return nil + } + + return nil +} + func (v *validateUtil) checkJSONFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { jsonArray := field.GetScalars().GetJsonData().GetData() if jsonArray == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() { diff --git a/internal/proxy/validate_util_test.go b/internal/proxy/validate_util_test.go index 25e21b3bc8..e22de6cf16 100644 --- a/internal/proxy/validate_util_test.go +++ b/internal/proxy/validate_util_test.go @@ -191,6 +191,124 @@ func Test_validateUtil_checkVarCharFieldData(t *testing.T) { }) } +func Test_validateUtil_checkTextFieldData(t *testing.T) { + t.Run("type mismatch", func(t *testing.T) { + f := &schemapb.FieldData{} + v := newValidateUtil() + assert.Error(t, v.checkTextFieldData(f, nil)) + }) + + t.Run("max length not found", func(t *testing.T) { + f := &schemapb.FieldData{ + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"111", "222"}, + }, + }, + }, + }, + } + + fs := &schemapb.FieldSchema{ + DataType: schemapb.DataType_Text, + } + + v := newValidateUtil(withMaxLenCheck()) + + err := v.checkTextFieldData(f, fs) + assert.Error(t, err) + }) + + t.Run("length exceeds", func(t *testing.T) { + f := &schemapb.FieldData{ + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"111", "222"}, + }, + }, + }, + }, + } + + fs := &schemapb.FieldSchema{ + DataType: schemapb.DataType_Text, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "2", + }, + }, + } + + v := newValidateUtil(withMaxLenCheck()) + + err := v.checkTextFieldData(f, fs) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + f := &schemapb.FieldData{ + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"111", "222"}, + }, + }, + }, + }, + } + + fs := &schemapb.FieldSchema{ + DataType: schemapb.DataType_Text, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "4", + }, + }, + } + + v := newValidateUtil(withMaxLenCheck()) + + err := v.checkTextFieldData(f, fs) + assert.NoError(t, err) + }) + + t.Run("no check", func(t *testing.T) { + f := &schemapb.FieldData{ + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"111", "222"}, + }, + }, + }, + }, + } + + fs := &schemapb.FieldSchema{ + DataType: schemapb.DataType_Text, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "2", + }, + }, + } + + v := newValidateUtil() + + err := v.checkTextFieldData(f, fs) + assert.NoError(t, err) + }) +} + func Test_validateUtil_checkBinaryVectorFieldData(t *testing.T) { v := newValidateUtil() assert.Error(t, v.checkBinaryVectorFieldData(&schemapb.FieldData{Field: &schemapb.FieldData_Scalars{}}, nil)) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 855f407c15..3ff0ca1949 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -1011,12 +1011,12 @@ func (sd *shardDelegator) buildBM25IDF(req *internalpb.SearchRequest) (float64, proto.Unmarshal(req.GetPlaceholderGroup(), pb) if len(pb.Placeholders) != 1 || len(pb.Placeholders[0].Values) == 0 { - return 0, merr.WrapErrParameterInvalidMsg("please provide varchar for BM25 Function based search") + return 0, merr.WrapErrParameterInvalidMsg("please provide varchar/text for BM25 Function based search") } holder := pb.Placeholders[0] if holder.Type != commonpb.PlaceholderType_VarChar { - return 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("please provide varchar for BM25 Function based search, got %s", holder.Type.String())) + return 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("please provide varchar/text for BM25 Function based search, got %s", holder.Type.String())) } str := funcutil.GetVarCharFromPlaceholder(holder) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 0ae5006581..a5ac4d79d1 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -338,7 +338,7 @@ func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.Dat if err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data, singleData.(*DoubleFieldData).ValidData); err != nil { return err } - case schemapb.DataType_String, schemapb.DataType_VarChar: + case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: for i, singleString := range singleData.(*StringFieldData).Data { isValid := true if len(singleData.(*StringFieldData).ValidData) != 0 { @@ -569,7 +569,7 @@ func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *Ins insertData.Data[fieldID] = doubleFieldData return len(singleData), nil - case schemapb.DataType_String, schemapb.DataType_VarChar: + case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: singleData := data.([]string) if fieldData == nil { fieldData = &StringFieldData{Data: make([]string, 0, rowNum)} diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index 161b8fb183..9538962dcc 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -779,7 +779,7 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy return &arrow.Float32Type{} case schemapb.DataType_Double: return &arrow.Float64Type{} - case schemapb.DataType_VarChar, schemapb.DataType_String: + case schemapb.DataType_VarChar, schemapb.DataType_String, schemapb.DataType_Text: return &arrow.StringType{} case schemapb.DataType_Array: return &arrow.BinaryType{} diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 13d49ab199..8241610e7d 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -334,6 +334,7 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { m[schemapb.DataType_VarChar] = stringEntry m[schemapb.DataType_String] = stringEntry + m[schemapb.DataType_Text] = stringEntry // We're not using the deserialized data in go, so we can skip the heavy pb serde. // If there is need in the future, just assign it to m[schemapb.DataType_Array] diff --git a/internal/storage/utils.go b/internal/storage/utils.go index f518dba0b4..0747d4c684 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -683,7 +683,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche ValidData: validData, } - case schemapb.DataType_String, schemapb.DataType_VarChar: + case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: srcData := srcField.GetScalars().GetStringData().GetData() validData := srcField.GetValidData() diff --git a/internal/util/function/text_embedding_function.go b/internal/util/function/text_embedding_function.go index 59a545361d..1f8db28bac 100644 --- a/internal/util/function/text_embedding_function.go +++ b/internal/util/function/text_embedding_function.go @@ -66,6 +66,10 @@ type TextEmbeddingFunction struct { embProvider textEmbeddingProvider } +func isValidInputDataType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_VarChar || dataType == schemapb.DataType_Text +} + func NewTextEmbeddingFunction(coll *schemapb.CollectionSchema, functionSchema *schemapb.FunctionSchema) (*TextEmbeddingFunction, error) { if len(functionSchema.GetOutputFieldNames()) != 1 { return nil, fmt.Errorf("Text function should only have one output field, but now is %d", len(functionSchema.GetOutputFieldNames())) @@ -125,8 +129,8 @@ func (runner *TextEmbeddingFunction) ProcessInsert(inputs []*schemapb.FieldData) return nil, fmt.Errorf("Text embedding function only receives one input field, but got [%d]", len(inputs)) } - if inputs[0].Type != schemapb.DataType_VarChar { - return nil, fmt.Errorf("Text embedding only supports varchar field as input field, but got %s", schemapb.DataType_name[int32(inputs[0].Type)]) + if !isValidInputDataType(inputs[0].Type) { + return nil, fmt.Errorf("Text embedding only supports varchar or text field as input field, but got %s", schemapb.DataType_name[int32(inputs[0].Type)]) } texts := inputs[0].GetScalars().GetStringData().GetData() @@ -182,8 +186,8 @@ func (runner *TextEmbeddingFunction) ProcessBulkInsert(inputs []storage.FieldDat return nil, fmt.Errorf("TextEmbedding function only receives one input, bug got [%d]", len(inputs)) } - if inputs[0].GetDataType() != schemapb.DataType_VarChar { - return nil, fmt.Errorf(" only supports varchar field, the input is not varchar") + if !isValidInputDataType(inputs[0].GetDataType()) { + return nil, fmt.Errorf("TextEmbedding function only supports varchar or text field as input field, but got %s", schemapb.DataType_name[int32(inputs[0].GetDataType())]) } texts, ok := inputs[0].GetDataRows().([]string) diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index efe15b89e1..c2ff5d1b78 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -384,7 +384,7 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetFloatData().GetData()) case schemapb.DataType_Double: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetDoubleData().GetData()) - case schemapb.DataType_String, schemapb.DataType_VarChar: + case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetStringData().GetData()) case schemapb.DataType_Array: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetArrayData().GetData()) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 5819bb8af4..07b22449bd 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1357,6 +1357,7 @@ type proxyConfig struct { SkipAutoIDCheck ParamItem `refreshable:"true"` SkipPartitionKeyCheck ParamItem `refreshable:"true"` MaxVarCharLength ParamItem `refreshable:"false"` + MaxTextLength ParamItem `refreshable:"false"` AccessLog AccessLogConfig @@ -1769,6 +1770,14 @@ please adjust in embedded Milvus: false`, } p.MaxVarCharLength.Init(base.mgr) + p.MaxTextLength = ParamItem{ + Key: "proxy.maxTextLength", + Version: "2.6.0", + DefaultValue: strconv.Itoa(2 * 1024 * 1024), // 2M + Doc: "maximum number of characters for a row of the text field", + } + p.MaxTextLength.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "proxy.gracefulStopTimeout", Version: "2.3.7", diff --git a/pkg/util/typeutil/gen_empty_field_data.go b/pkg/util/typeutil/gen_empty_field_data.go index 047a2a331e..61a5b31858 100644 --- a/pkg/util/typeutil/gen_empty_field_data.go +++ b/pkg/util/typeutil/gen_empty_field_data.go @@ -261,7 +261,7 @@ func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) return genEmptyFloatFieldData(field), nil case schemapb.DataType_Double: return genEmptyDoubleFieldData(field), nil - case schemapb.DataType_VarChar: + case schemapb.DataType_VarChar, schemapb.DataType_Text: return genEmptyVarCharFieldData(field), nil case schemapb.DataType_Array: return genEmptyArrayFieldData(field), nil diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 2e68e0dc50..3585035bd4 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -57,7 +57,7 @@ func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFiel } switch fieldSchema.DataType { - case schemapb.DataType_VarChar: + case schemapb.DataType_VarChar, schemapb.DataType_Text: maxLengthPerRowValue, ok := paramsMap[common.MaxLengthKey] if !ok { return 0, fmt.Errorf("the max_length was not specified, field type is %s", fieldSchema.DataType.String()) @@ -114,7 +114,7 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe res += 4 case schemapb.DataType_Int64, schemapb.DataType_Double: res += 8 - case schemapb.DataType_VarChar, schemapb.DataType_Array, schemapb.DataType_JSON: + case schemapb.DataType_VarChar, schemapb.DataType_Text, schemapb.DataType_Array, schemapb.DataType_JSON: maxLengthPerRow, err := getVarFieldLength(fs, policy) if err != nil { return 0, err @@ -192,7 +192,7 @@ func CalcColumnSize(column *schemapb.FieldData) int { res += len(column.GetScalars().GetFloatData().GetData()) * 4 case schemapb.DataType_Double: res += len(column.GetScalars().GetDoubleData().GetData()) * 8 - case schemapb.DataType_VarChar: + case schemapb.DataType_VarChar, schemapb.DataType_Text: for _, str := range column.GetScalars().GetStringData().GetData() { res += len(str) } @@ -225,7 +225,7 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e res += 4 case schemapb.DataType_Int64, schemapb.DataType_Double: res += 8 - case schemapb.DataType_VarChar: + case schemapb.DataType_VarChar, schemapb.DataType_Text: if rowOffset >= len(fs.GetScalars().GetStringData().GetData()) { return 0, fmt.Errorf("offset out range of field datas") } @@ -580,7 +580,7 @@ func IsBoolType(dataType schemapb.DataType) bool { // IsStringType returns true if input is a varChar type, otherwise false func IsStringType(dataType schemapb.DataType) bool { switch dataType { - case schemapb.DataType_String, schemapb.DataType_VarChar: + case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: return true default: return false @@ -1478,7 +1478,7 @@ func GetData(field *schemapb.FieldData, idx int) interface{} { return field.GetScalars().GetFloatData().GetData()[idx] case schemapb.DataType_Double: return field.GetScalars().GetDoubleData().GetData()[idx] - case schemapb.DataType_VarChar: + case schemapb.DataType_VarChar, schemapb.DataType_Text: return field.GetScalars().GetStringData().GetData()[idx] case schemapb.DataType_FloatVector: dim := int(field.GetVectors().GetDim()) diff --git a/tests/go_client/testcases/collection_test.go b/tests/go_client/testcases/collection_test.go index e7476c0b1a..96810a540a 100644 --- a/tests/go_client/testcases/collection_test.go +++ b/tests/go_client/testcases/collection_test.go @@ -554,7 +554,7 @@ func TestCreateCollectionInvalidFields(t *testing.T) { {fields: []*entity.Field{pkField, pkField2, vecField}, errMsg: "there are more than one primary key"}, {fields: []*entity.Field{pkField, vecField, noneField}, errMsg: "data type None is not valid"}, {fields: []*entity.Field{pkField, vecField, stringField}, errMsg: "string data type not supported yet, please use VarChar type instead"}, - {fields: []*entity.Field{pkField, vecField, varcharField}, errMsg: "type param(max_length) should be specified for varChar field"}, + {fields: []*entity.Field{pkField, vecField, varcharField}, errMsg: "type param(max_length) should be specified for the field"}, } collName := common.GenRandomString(prefix, 6) @@ -903,12 +903,12 @@ func TestCreateVarcharArrayInvalidLength(t *testing.T) { // create collection err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema)) - common.CheckErr(t, err, false, "type param(max_length) should be specified for varChar field") + common.CheckErr(t, err, false, "type param(max_length) should be specified for the field") // invalid Capacity for _, invalidLength := range []int64{-1, 0, common.MaxLength + 1} { arrayVarcharField.WithMaxLength(invalidLength) err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema)) - common.CheckErr(t, err, false, "the maximum length specified for a VarChar field(array) should be in (0, 65535]") + common.CheckErr(t, err, false, "the maximum length specified for the field(array) should be in (0, 65535]") } } @@ -925,12 +925,12 @@ func TestCreateVarcharInvalidLength(t *testing.T) { schema := entity.NewSchema().WithName(collName).WithField(varcharField).WithField(vecField) // create collection err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema)) - common.CheckErr(t, err, false, "type param(max_length) should be specified for varChar field") + common.CheckErr(t, err, false, "type param(max_length) should be specified for the field") // invalid Capacity for _, invalidLength := range []int64{-1, 0, common.MaxLength + 1} { varcharField.WithMaxLength(invalidLength) err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema)) - common.CheckErr(t, err, false, "the maximum length specified for a VarChar field(varchar) should be in (0, 65535]") + common.CheckErr(t, err, false, "the maximum length specified for the field(varchar) should be in (0, 65535]") } } diff --git a/tests/python_client/milvus_client/test_milvus_client_collection.py b/tests/python_client/milvus_client/test_milvus_client_collection.py index 4daf70a7d1..f73c06d949 100644 --- a/tests/python_client/milvus_client/test_milvus_client_collection.py +++ b/tests/python_client/milvus_client/test_milvus_client_collection.py @@ -137,8 +137,8 @@ class TestMilvusClientCollectionInvalid(TestMilvusClientV2Base): client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection - error = {ct.err_code: 65535, ct.err_msg: f"type param(max_length) should be specified for varChar " - f"field of collection {collection_name}"} + error = {ct.err_code: 65535, ct.err_msg: f"type param(max_length) should be specified for the " + f"field({default_primary_key_field_name}) of collection {collection_name}"} self.create_collection(client, collection_name, default_dim, id_type="string", auto_id=True, check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/milvus_client/test_milvus_client_search.py b/tests/python_client/milvus_client/test_milvus_client_search.py index 32e65200cb..dea935eac6 100644 --- a/tests/python_client/milvus_client/test_milvus_client_search.py +++ b/tests/python_client/milvus_client/test_milvus_client_search.py @@ -77,8 +77,8 @@ class TestMilvusClientSearchInvalid(TestMilvusClientV2Base): client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection - error = {ct.err_code: 65535, ct.err_msg: f"type param(max_length) should be specified for varChar " - f"field(id) of collection {collection_name}"} + error = {ct.err_code: 65535, ct.err_msg: f"type param(max_length) should be specified for the " + f"field({default_primary_key_field_name}) of collection {collection_name}"} self.create_collection(client, collection_name, default_dim, id_type="string", auto_id=True, check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/testcases/test_collection.py b/tests/python_client/testcases/test_collection.py index 14a2e667d8..13d2354bdb 100644 --- a/tests/python_client/testcases/test_collection.py +++ b/tests/python_client/testcases/test_collection.py @@ -3880,7 +3880,7 @@ class TestCollectionString(TestcaseBase): max_length = 65535 + 1 string_field = cf.gen_string_field(max_length=max_length) schema = cf.gen_collection_schema([int_field, string_field, vec_field]) - error = {ct.err_code: 65535, ct.err_msg: f"the maximum length specified for a VarChar field({ct.default_string_field_name}) should be in (0, 65535]"} + error = {ct.err_code: 65535, ct.err_msg: f"the maximum length specified for the field({ct.default_string_field_name}) should be in (0, 65535]"} self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.err_res, check_items=error) @@ -4098,7 +4098,7 @@ class TestCollectionARRAY(TestcaseBase): array_schema = cf.gen_collection_schema([int_field, vec_field, array_field]) self.init_collection_wrap(schema=array_schema, check_task=CheckTasks.err_res, check_items={ct.err_code: 65535, - ct.err_msg: "type param(max_length) should be specified for varChar " + ct.err_msg: "type param(max_length) should be specified for the " "field(int_array)"}) @pytest.mark.tags(CaseLabel.L2) diff --git a/tests/python_client/testcases/test_full_text_search.py b/tests/python_client/testcases/test_full_text_search.py index 9160071890..427c5f655c 100644 --- a/tests/python_client/testcases/test_full_text_search.py +++ b/tests/python_client/testcases/test_full_text_search.py @@ -3278,7 +3278,7 @@ class TestSearchWithFullTextSearchNegative(TestcaseBase): search_data = cf.gen_vectors(nb=nq, dim=1000, vector_data_type="FLOAT_VECTOR") log.info(f"search data: {search_data}") error = {ct.err_code: 65535, - ct.err_msg: "please provide varchar for BM25 Function based search"} + ct.err_msg: "please provide varchar/text for BM25 Function based search"} collection_w.search( data=search_data, anns_field="text_sparse_emb",