From d16e18fd341c45a5926ce372b587f873da38840f Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 23 May 2023 10:19:26 +0800 Subject: [PATCH] Add output fields for search/query results (#24302) Signed-off-by: cai.zhang --- go.mod | 2 +- go.sum | 2 + internal/core/src/pb/schema.pb.cc | 67 +++++++++--- internal/core/src/pb/schema.pb.h | 101 ++++++++++++++++++ .../parser/planparserv2/plan_parser_v2.go | 18 ++++ internal/proxy/task_query.go | 9 +- internal/proxy/task_search.go | 9 +- internal/proxy/task_test.go | 91 +++++++++++----- internal/proxy/util.go | 32 +++++- internal/rootcoord/create_collection_task.go | 1 + pkg/go.mod | 2 +- pkg/go.sum | 2 + 12 files changed, 285 insertions(+), 51 deletions(-) diff --git a/go.mod b/go.mod index 8b1b74eb0b..52d1268866 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/protobuf v1.5.3 github.com/klauspost/compress v1.14.4 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 + github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/minio/minio-go/v7 v7.0.17 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/go.sum b/go.sum index b3f6d1cc61..49a94643b5 100644 --- a/go.sum +++ b/go.sum @@ -587,6 +587,8 @@ github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63 h1:V github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 h1:Dp5AAbOSTq31QLatGXamBMk/o670MkbRi8NoW17ypew= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 h1:LdDHjEjus1NdC9ELbpQa6DfUHJotJUW2kD4S+8nvjw4= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/pb/schema.pb.cc b/internal/core/src/pb/schema.pb.cc index 22950333eb..30d6c2e5ad 100755 --- a/internal/core/src/pb/schema.pb.cc +++ b/internal/core/src/pb/schema.pb.cc @@ -266,6 +266,7 @@ PROTOBUF_CONSTEXPR SearchResultData::SearchResultData( , /*decltype(_impl_.scores_)*/{} , /*decltype(_impl_.topks_)*/{} , /*decltype(_impl_._topks_cached_byte_size_)*/{0} + , /*decltype(_impl_.output_fields_)*/{} , /*decltype(_impl_.ids_)*/nullptr , /*decltype(_impl_.num_queries_)*/int64_t{0} , /*decltype(_impl_.top_k_)*/int64_t{0} @@ -455,6 +456,7 @@ const uint32_t TableStruct_schema_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE(p PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::SearchResultData, _impl_.scores_), PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::SearchResultData, _impl_.ids_), PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::SearchResultData, _impl_.topks_), + PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::SearchResultData, _impl_.output_fields_), }; static const ::_pbi::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { { 0, -1, -1, sizeof(::milvus::proto::schema::FieldSchema)}, @@ -551,28 +553,28 @@ const char descriptor_table_protodef_schema_2eproto[] PROTOBUF_SECTION_VARIABLE( "B\007\n\005field\"w\n\003IDs\0220\n\006int_id\030\001 \001(\0132\036.milvu" "s.proto.schema.LongArrayH\000\0222\n\006str_id\030\002 \001" "(\0132 .milvus.proto.schema.StringArrayH\000B\n" - "\n\010id_field\"\261\001\n\020SearchResultData\022\023\n\013num_q" + "\n\010id_field\"\310\001\n\020SearchResultData\022\023\n\013num_q" "ueries\030\001 \001(\003\022\r\n\005top_k\030\002 \001(\003\0223\n\013fields_da" "ta\030\003 \003(\0132\036.milvus.proto.schema.FieldData" "\022\016\n\006scores\030\004 \003(\002\022%\n\003ids\030\005 \001(\0132\030.milvus.p" - "roto.schema.IDs\022\r\n\005topks\030\006 \003(\003*\261\001\n\010DataT" - "ype\022\010\n\004None\020\000\022\010\n\004Bool\020\001\022\010\n\004Int8\020\002\022\t\n\005Int" - "16\020\003\022\t\n\005Int32\020\004\022\t\n\005Int64\020\005\022\t\n\005Float\020\n\022\n\n" - "\006Double\020\013\022\n\n\006String\020\024\022\013\n\007VarChar\020\025\022\t\n\005Ar" - "ray\020\026\022\010\n\004JSON\020\027\022\020\n\014BinaryVector\020d\022\017\n\013Flo" - "atVector\020e*V\n\nFieldState\022\020\n\014FieldCreated" - "\020\000\022\021\n\rFieldCreating\020\001\022\021\n\rFieldDropping\020\002" - "\022\020\n\014FieldDropped\020\003Bf\n\016io.milvus.grpcB\013Sc" - "hemaProtoP\001Z1github.com/milvus-io/milvus" - "-proto/go-api/schemapb\240\001\001\252\002\016IO.Milvus.Gr" - "pcb\006proto3" + "roto.schema.IDs\022\r\n\005topks\030\006 \003(\003\022\025\n\routput" + "_fields\030\007 \003(\t*\261\001\n\010DataType\022\010\n\004None\020\000\022\010\n\004" + "Bool\020\001\022\010\n\004Int8\020\002\022\t\n\005Int16\020\003\022\t\n\005Int32\020\004\022\t" + "\n\005Int64\020\005\022\t\n\005Float\020\n\022\n\n\006Double\020\013\022\n\n\006Stri" + "ng\020\024\022\013\n\007VarChar\020\025\022\t\n\005Array\020\026\022\010\n\004JSON\020\027\022\020" + "\n\014BinaryVector\020d\022\017\n\013FloatVector\020e*V\n\nFie" + "ldState\022\020\n\014FieldCreated\020\000\022\021\n\rFieldCreati" + "ng\020\001\022\021\n\rFieldDropping\020\002\022\020\n\014FieldDropped\020" + "\003Bf\n\016io.milvus.grpcB\013SchemaProtoP\001Z1gith" + "ub.com/milvus-io/milvus-proto/go-api/sch" + "emapb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3" ; static const ::_pbi::DescriptorTable* const descriptor_table_schema_2eproto_deps[1] = { &::descriptor_table_common_2eproto, }; static ::_pbi::once_flag descriptor_table_schema_2eproto_once; const ::_pbi::DescriptorTable descriptor_table_schema_2eproto = { - false, false, 2730, descriptor_table_protodef_schema_2eproto, + false, false, 2753, descriptor_table_protodef_schema_2eproto, "schema.proto", &descriptor_table_schema_2eproto_once, descriptor_table_schema_2eproto_deps, 1, 17, schemas, file_default_instances, TableStruct_schema_2eproto::offsets, @@ -5482,6 +5484,7 @@ SearchResultData::SearchResultData(const SearchResultData& from) , decltype(_impl_.scores_){from._impl_.scores_} , decltype(_impl_.topks_){from._impl_.topks_} , /*decltype(_impl_._topks_cached_byte_size_)*/{0} + , decltype(_impl_.output_fields_){from._impl_.output_fields_} , decltype(_impl_.ids_){nullptr} , decltype(_impl_.num_queries_){} , decltype(_impl_.top_k_){} @@ -5506,6 +5509,7 @@ inline void SearchResultData::SharedCtor( , decltype(_impl_.scores_){arena} , decltype(_impl_.topks_){arena} , /*decltype(_impl_._topks_cached_byte_size_)*/{0} + , decltype(_impl_.output_fields_){arena} , decltype(_impl_.ids_){nullptr} , decltype(_impl_.num_queries_){int64_t{0}} , decltype(_impl_.top_k_){int64_t{0}} @@ -5527,6 +5531,7 @@ inline void SearchResultData::SharedDtor() { _impl_.fields_data_.~RepeatedPtrField(); _impl_.scores_.~RepeatedField(); _impl_.topks_.~RepeatedField(); + _impl_.output_fields_.~RepeatedPtrField(); if (this != internal_default_instance()) delete _impl_.ids_; } @@ -5543,6 +5548,7 @@ void SearchResultData::Clear() { _impl_.fields_data_.Clear(); _impl_.scores_.Clear(); _impl_.topks_.Clear(); + _impl_.output_fields_.Clear(); if (GetArenaForAllocation() == nullptr && _impl_.ids_ != nullptr) { delete _impl_.ids_; } @@ -5618,6 +5624,21 @@ const char* SearchResultData::_InternalParse(const char* ptr, ::_pbi::ParseConte } else goto handle_unusual; continue; + // repeated string output_fields = 7; + case 7: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 58)) { + ptr -= 1; + do { + ptr += 1; + auto str = _internal_add_output_fields(); + ptr = ::_pbi::InlineGreedyStringParser(str, ptr, ctx); + CHK_(ptr); + CHK_(::_pbi::VerifyUTF8(str, "milvus.proto.schema.SearchResultData.output_fields")); + if (!ctx->DataAvailable(ptr)) break; + } while (::PROTOBUF_NAMESPACE_ID::internal::ExpectTag<58>(ptr)); + } else + goto handle_unusual; + continue; default: goto handle_unusual; } // switch @@ -5688,6 +5709,16 @@ uint8_t* SearchResultData::_InternalSerialize( } } + // repeated string output_fields = 7; + for (int i = 0, n = this->_internal_output_fields_size(); i < n; i++) { + const auto& s = this->_internal_output_fields(i); + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( + s.data(), static_cast(s.length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, + "milvus.proto.schema.SearchResultData.output_fields"); + target = stream->WriteString(7, s, target); + } + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOBUF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); @@ -5736,6 +5767,14 @@ size_t SearchResultData::ByteSizeLong() const { total_size += data_size; } + // repeated string output_fields = 7; + total_size += 1 * + ::PROTOBUF_NAMESPACE_ID::internal::FromIntSize(_impl_.output_fields_.size()); + for (int i = 0, n = _impl_.output_fields_.size(); i < n; i++) { + total_size += ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + _impl_.output_fields_.Get(i)); + } + // .milvus.proto.schema.IDs ids = 5; if (this->_internal_has_ids()) { total_size += 1 + @@ -5774,6 +5813,7 @@ void SearchResultData::MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const _this->_impl_.fields_data_.MergeFrom(from._impl_.fields_data_); _this->_impl_.scores_.MergeFrom(from._impl_.scores_); _this->_impl_.topks_.MergeFrom(from._impl_.topks_); + _this->_impl_.output_fields_.MergeFrom(from._impl_.output_fields_); if (from._internal_has_ids()) { _this->_internal_mutable_ids()->::milvus::proto::schema::IDs::MergeFrom( from._internal_ids()); @@ -5804,6 +5844,7 @@ void SearchResultData::InternalSwap(SearchResultData* other) { _impl_.fields_data_.InternalSwap(&other->_impl_.fields_data_); _impl_.scores_.InternalSwap(&other->_impl_.scores_); _impl_.topks_.InternalSwap(&other->_impl_.topks_); + _impl_.output_fields_.InternalSwap(&other->_impl_.output_fields_); ::PROTOBUF_NAMESPACE_ID::internal::memswap< PROTOBUF_FIELD_OFFSET(SearchResultData, _impl_.top_k_) + sizeof(SearchResultData::_impl_.top_k_) diff --git a/internal/core/src/pb/schema.pb.h b/internal/core/src/pb/schema.pb.h index 8ddfeac019..4042096da4 100755 --- a/internal/core/src/pb/schema.pb.h +++ b/internal/core/src/pb/schema.pb.h @@ -3587,6 +3587,7 @@ class SearchResultData final : kFieldsDataFieldNumber = 3, kScoresFieldNumber = 4, kTopksFieldNumber = 6, + kOutputFieldsFieldNumber = 7, kIdsFieldNumber = 5, kNumQueriesFieldNumber = 1, kTopKFieldNumber = 2, @@ -3653,6 +3654,30 @@ class SearchResultData final : ::PROTOBUF_NAMESPACE_ID::RepeatedField< int64_t >* mutable_topks(); + // repeated string output_fields = 7; + int output_fields_size() const; + private: + int _internal_output_fields_size() const; + public: + void clear_output_fields(); + const std::string& output_fields(int index) const; + std::string* mutable_output_fields(int index); + void set_output_fields(int index, const std::string& value); + void set_output_fields(int index, std::string&& value); + void set_output_fields(int index, const char* value); + void set_output_fields(int index, const char* value, size_t size); + std::string* add_output_fields(); + void add_output_fields(const std::string& value); + void add_output_fields(std::string&& value); + void add_output_fields(const char* value); + void add_output_fields(const char* value, size_t size); + const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField& output_fields() const; + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField* mutable_output_fields(); + private: + const std::string& _internal_output_fields(int index) const; + std::string* _internal_add_output_fields(); + public: + // .milvus.proto.schema.IDs ids = 5; bool has_ids() const; private: @@ -3701,6 +3726,7 @@ class SearchResultData final : ::PROTOBUF_NAMESPACE_ID::RepeatedField< float > scores_; ::PROTOBUF_NAMESPACE_ID::RepeatedField< int64_t > topks_; mutable std::atomic _topks_cached_byte_size_; + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField output_fields_; ::milvus::proto::schema::IDs* ids_; int64_t num_queries_; int64_t top_k_; @@ -6804,6 +6830,81 @@ SearchResultData::mutable_topks() { return _internal_mutable_topks(); } +// repeated string output_fields = 7; +inline int SearchResultData::_internal_output_fields_size() const { + return _impl_.output_fields_.size(); +} +inline int SearchResultData::output_fields_size() const { + return _internal_output_fields_size(); +} +inline void SearchResultData::clear_output_fields() { + _impl_.output_fields_.Clear(); +} +inline std::string* SearchResultData::add_output_fields() { + std::string* _s = _internal_add_output_fields(); + // @@protoc_insertion_point(field_add_mutable:milvus.proto.schema.SearchResultData.output_fields) + return _s; +} +inline const std::string& SearchResultData::_internal_output_fields(int index) const { + return _impl_.output_fields_.Get(index); +} +inline const std::string& SearchResultData::output_fields(int index) const { + // @@protoc_insertion_point(field_get:milvus.proto.schema.SearchResultData.output_fields) + return _internal_output_fields(index); +} +inline std::string* SearchResultData::mutable_output_fields(int index) { + // @@protoc_insertion_point(field_mutable:milvus.proto.schema.SearchResultData.output_fields) + return _impl_.output_fields_.Mutable(index); +} +inline void SearchResultData::set_output_fields(int index, const std::string& value) { + _impl_.output_fields_.Mutable(index)->assign(value); + // @@protoc_insertion_point(field_set:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::set_output_fields(int index, std::string&& value) { + _impl_.output_fields_.Mutable(index)->assign(std::move(value)); + // @@protoc_insertion_point(field_set:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::set_output_fields(int index, const char* value) { + GOOGLE_DCHECK(value != nullptr); + _impl_.output_fields_.Mutable(index)->assign(value); + // @@protoc_insertion_point(field_set_char:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::set_output_fields(int index, const char* value, size_t size) { + _impl_.output_fields_.Mutable(index)->assign( + reinterpret_cast(value), size); + // @@protoc_insertion_point(field_set_pointer:milvus.proto.schema.SearchResultData.output_fields) +} +inline std::string* SearchResultData::_internal_add_output_fields() { + return _impl_.output_fields_.Add(); +} +inline void SearchResultData::add_output_fields(const std::string& value) { + _impl_.output_fields_.Add()->assign(value); + // @@protoc_insertion_point(field_add:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::add_output_fields(std::string&& value) { + _impl_.output_fields_.Add(std::move(value)); + // @@protoc_insertion_point(field_add:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::add_output_fields(const char* value) { + GOOGLE_DCHECK(value != nullptr); + _impl_.output_fields_.Add()->assign(value); + // @@protoc_insertion_point(field_add_char:milvus.proto.schema.SearchResultData.output_fields) +} +inline void SearchResultData::add_output_fields(const char* value, size_t size) { + _impl_.output_fields_.Add()->assign(reinterpret_cast(value), size); + // @@protoc_insertion_point(field_add_pointer:milvus.proto.schema.SearchResultData.output_fields) +} +inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField& +SearchResultData::output_fields() const { + // @@protoc_insertion_point(field_list:milvus.proto.schema.SearchResultData.output_fields) + return _impl_.output_fields_; +} +inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField* +SearchResultData::mutable_output_fields() { + // @@protoc_insertion_point(field_mutable_list:milvus.proto.schema.SearchResultData.output_fields) + return &_impl_.output_fields_; +} + #ifdef __GNUC__ #pragma GCC diagnostic pop #endif // __GNUC__ diff --git a/internal/parser/planparserv2/plan_parser_v2.go b/internal/parser/planparserv2/plan_parser_v2.go index d3b24020d2..2092e56d0a 100644 --- a/internal/parser/planparserv2/plan_parser_v2.go +++ b/internal/parser/planparserv2/plan_parser_v2.go @@ -62,6 +62,24 @@ func ParseExpr(schema *typeutil.SchemaHelper, exprStr string) (*planpb.Expr, err return predicate.expr, nil } +func ParseIdentifier(schema *typeutil.SchemaHelper, identifier string, checkFunc func(*planpb.Expr) error) error { + ret := handleExpr(schema, identifier) + + if err := getError(ret); err != nil { + return fmt.Errorf("cannot parse identifier: %s, error: %s", identifier, err) + } + + predicate := getExpr(ret) + if predicate == nil { + return fmt.Errorf("cannot parse identifier: %s", identifier) + } + if predicate.expr.GetColumnExpr() == nil { + return fmt.Errorf("cannot parse identifier: %s", identifier) + } + + return checkFunc(predicate.expr) +} + func CreateRetrievePlan(schemaPb *schemapb.CollectionSchema, exprStr string) (*planpb.PlanNode, error) { schema, err := typeutil.CreateSchemaHelper(schemaPb) if err != nil { diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index add895fc0f..fef9936163 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -56,8 +56,9 @@ type queryTask struct { queryParams *queryParams schema *schemapb.CollectionSchema - resultBuf chan *internalpb.RetrieveResults - toReduceResults []*internalpb.RetrieveResults + resultBuf chan *internalpb.RetrieveResults + toReduceResults []*internalpb.RetrieveResults + userOutputFields []string queryShardPolicy pickShardPolicy shardMgr *shardClientMgr @@ -70,7 +71,7 @@ type queryParams struct { offset int64 } -// translateOutputFields translates output fields name to output fields id. +// translateToOutputFieldIDs translates output fields name to output fields id. func translateToOutputFieldIDs(outputFields []string, schema *schemapb.CollectionSchema) ([]UniqueID, error) { outputFieldIDs := make([]UniqueID, 0, len(outputFields)+1) if len(outputFields) == 0 { @@ -220,7 +221,7 @@ func (t *queryTask) createPlan(ctx context.Context) error { return err } - t.request.OutputFields, err = translateOutputFields(t.request.OutputFields, schema, true) + t.request.OutputFields, t.userOutputFields, err = translateOutputFields(t.request.OutputFields, schema, true) if err != nil { return err } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 836b639ba8..f65fa6c856 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -59,9 +59,10 @@ type searchTask struct { schema *schemapb.CollectionSchema requery bool - offset int64 - resultBuf chan *internalpb.SearchResults - toReduceResults []*internalpb.SearchResults + offset int64 + resultBuf chan *internalpb.SearchResults + toReduceResults []*internalpb.SearchResults + userOutputFields []string searchShardPolicy pickShardPolicy shardMgr *shardClientMgr @@ -234,7 +235,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { return err } - t.request.OutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false) + t.request.OutputFields, t.userOutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false) if err != nil { return err } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 2ac8d506ed..b3748ec314 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -358,6 +358,7 @@ func TestTranslateOutputFields(t *testing.T) { binaryVectorFieldName = "binary_vector" ) var outputFields []string + var userOutputFields []string var err error schema := &schemapb.CollectionSchema{ @@ -365,73 +366,91 @@ func TestTranslateOutputFields(t *testing.T) { Description: "TestTranslateOutputFields", AutoID: false, Fields: []*schemapb.FieldSchema{ - {Name: idFieldName, DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {Name: tsFieldName, DataType: schemapb.DataType_Int64}, - {Name: floatVectorFieldName, DataType: schemapb.DataType_FloatVector}, - {Name: binaryVectorFieldName, DataType: schemapb.DataType_BinaryVector}, + {Name: idFieldName, FieldID: 0, DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {Name: tsFieldName, FieldID: 1, DataType: schemapb.DataType_Int64}, + {Name: floatVectorFieldName, FieldID: 100, DataType: schemapb.DataType_FloatVector}, + {Name: binaryVectorFieldName, FieldID: 101, DataType: schemapb.DataType_BinaryVector}, }, } - outputFields, err = translateOutputFields([]string{}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{}, outputFields) + assert.ElementsMatch(t, []string{}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName, tsFieldName}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, tsFieldName}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName, tsFieldName, floatVectorFieldName}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, tsFieldName, floatVectorFieldName}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*"}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{"*"}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{" * "}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{" * "}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*", tsFieldName}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{"*", tsFieldName}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*", floatVectorFieldName}, schema, false) + outputFields, userOutputFields, err = translateOutputFields([]string{"*", floatVectorFieldName}, schema, false) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) //========================================================================= - outputFields, err = translateOutputFields([]string{}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName, tsFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, tsFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{idFieldName, tsFieldName, floatVectorFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, tsFieldName, floatVectorFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*"}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{"*"}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*", tsFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{"*", tsFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) - outputFields, err = translateOutputFields([]string{"*", floatVectorFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{"*", floatVectorFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields) + assert.ElementsMatch(t, []string{idFieldName, tsFieldName, floatVectorFieldName, binaryVectorFieldName}, userOutputFields) + + outputFields, userOutputFields, err = translateOutputFields([]string{"A"}, schema, true) + assert.Error(t, err) t.Run("enable dynamic schema", func(t *testing.T) { schema := &schemapb.CollectionSchema{ @@ -440,17 +459,39 @@ func TestTranslateOutputFields(t *testing.T) { AutoID: false, EnableDynamicField: true, Fields: []*schemapb.FieldSchema{ - {Name: idFieldName, DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {Name: tsFieldName, DataType: schemapb.DataType_Int64}, - {Name: floatVectorFieldName, DataType: schemapb.DataType_FloatVector}, - {Name: binaryVectorFieldName, DataType: schemapb.DataType_BinaryVector}, - {Name: common.MetaFieldName, DataType: schemapb.DataType_JSON, IsDynamic: true}, + {Name: idFieldName, FieldID: 1, DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {Name: tsFieldName, FieldID: 2, DataType: schemapb.DataType_Int64}, + {Name: floatVectorFieldName, FieldID: 100, DataType: schemapb.DataType_FloatVector}, + {Name: binaryVectorFieldName, FieldID: 101, DataType: schemapb.DataType_BinaryVector}, + {Name: common.MetaFieldName, FieldID: 102, DataType: schemapb.DataType_JSON, IsDynamic: true}, }, } - outputFields, err = translateOutputFields([]string{"A", idFieldName}, schema, true) + outputFields, userOutputFields, err = translateOutputFields([]string{"A", idFieldName}, schema, true) assert.Equal(t, nil, err) assert.ElementsMatch(t, []string{common.MetaFieldName, idFieldName}, outputFields) + assert.ElementsMatch(t, []string{"A", idFieldName}, userOutputFields) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "$meta[\"A\"]"}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "$meta[]"}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "$meta[\"\"]"}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "$meta["}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "[]"}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, "A > 1"}, schema, true) + assert.Error(t, err) + + outputFields, userOutputFields, err = translateOutputFields([]string{idFieldName, floatVectorFieldName, ""}, schema, true) + assert.Error(t, err) }) } diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 6671ff2a79..7d62523431 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -31,6 +31,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/parser/planparserv2" + "github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" @@ -798,11 +800,13 @@ func passwordVerify(ctx context.Context, username, rawPwd string, globalMetaCach // output_fields=["*"] ==> [A,B,C,D] // output_fields=["*",A] ==> [A,B,C,D] // output_fields=["*",C] ==> [A,B,C,D] -func translateOutputFields(outputFields []string, schema *schemapb.CollectionSchema, addPrimary bool) ([]string, error) { +func translateOutputFields(outputFields []string, schema *schemapb.CollectionSchema, addPrimary bool) ([]string, []string, error) { var primaryFieldName string allFieldNameMap := make(map[string]bool) resultFieldNameMap := make(map[string]bool) resultFieldNames := make([]string, 0) + userOutputFieldsMap := make(map[string]bool) + userOutputFields := make([]string, 0) for _, field := range schema.Fields { if field.IsPrimaryKey { @@ -816,15 +820,33 @@ func translateOutputFields(outputFields []string, schema *schemapb.CollectionSch if outputFieldName == "*" { for fieldName := range allFieldNameMap { resultFieldNameMap[fieldName] = true + userOutputFieldsMap[fieldName] = true } } else { if _, ok := allFieldNameMap[outputFieldName]; ok { resultFieldNameMap[outputFieldName] = true + userOutputFieldsMap[outputFieldName] = true } else { if schema.EnableDynamicField { + schemaH, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + return nil, nil, err + } + err = planparserv2.ParseIdentifier(schemaH, outputFieldName, func(expr *planpb.Expr) error { + if len(expr.GetColumnExpr().GetInfo().GetNestedPath()) == 1 && + expr.GetColumnExpr().GetInfo().GetNestedPath()[0] == outputFieldName { + return nil + } + return fmt.Errorf("not suppot getting subkeys of json field yet") + }) + if err != nil { + log.Info("parse output field name failed", zap.String("field name", outputFieldName)) + return nil, nil, fmt.Errorf("parse output field name failed: %s", outputFieldName) + } resultFieldNameMap[common.MetaFieldName] = true + userOutputFieldsMap[outputFieldName] = true } else { - return nil, fmt.Errorf("field %s not exist", outputFieldName) + return nil, nil, fmt.Errorf("field %s not exist", outputFieldName) } } @@ -833,12 +855,16 @@ func translateOutputFields(outputFields []string, schema *schemapb.CollectionSch if addPrimary { resultFieldNameMap[primaryFieldName] = true + userOutputFieldsMap[primaryFieldName] = true } for fieldName := range resultFieldNameMap { resultFieldNames = append(resultFieldNames, fieldName) } - return resultFieldNames, nil + for fieldName := range userOutputFieldsMap { + userOutputFields = append(userOutputFields, fieldName) + } + return resultFieldNames, userOutputFields, nil } func validateIndexName(indexName string) error { diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 8c1361d6e7..a51fa18bdf 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -176,6 +176,7 @@ func (t *createCollectionTask) appendDynamicField(schema *schemapb.CollectionSch DataType: schemapb.DataType_JSON, IsDynamic: true, }) + log.Info("append dynamic field", zap.String("collection", schema.Name)) } } diff --git a/pkg/go.mod b/pkg/go.mod index 5b82ceeb24..f60a0b7332 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -12,7 +12,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.14.4 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 - github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 + github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 github.com/panjf2000/ants/v2 v2.4.8 github.com/prometheus/client_golang v1.11.1 github.com/samber/lo v1.27.0 diff --git a/pkg/go.sum b/pkg/go.sum index c333985622..9621129b8f 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -473,6 +473,8 @@ github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63 h1:V github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 h1:Dp5AAbOSTq31QLatGXamBMk/o670MkbRi8NoW17ypew= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 h1:LdDHjEjus1NdC9ELbpQa6DfUHJotJUW2kD4S+8nvjw4= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=