From a992dcf6a86e7d33f6250ff23f757080048bd527 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 16 Jul 2021 17:19:55 +0800 Subject: [PATCH] Support query return vector output field (#6570) * improve code readibility Signed-off-by: yudong.cai * add offset in RetrieveResults Signed-off-by: yudong.cai * add VectorFieldInfo into Segment struct Signed-off-by: yudong.cai * add new interface for query vector Signed-off-by: yudong.cai * update load vector field logic Signed-off-by: yudong.cai * update load vector field logic Signed-off-by: yudong.cai * fill in field name in query result Signed-off-by: yudong.cai * add FieldId into FieldData Signed-off-by: yudong.cai * add fillVectorOutputFieldsIfNeeded Signed-off-by: yudong.cai * update data_codec_test.go Signed-off-by: yudong.cai * add DeserializeFieldData Signed-off-by: yudong.cai * realize query return vector output field Signed-off-by: yudong.cai * fix static-check Signed-off-by: yudong.cai * disable query vector case Signed-off-by: yudong.cai --- internal/core/src/pb/schema.pb.cc | 90 +++++-- internal/core/src/pb/schema.pb.h | 21 ++ internal/core/src/pb/segcore.pb.cc | 112 +++++++-- internal/core/src/pb/segcore.pb.h | 50 +++- .../core/src/segcore/SegmentInterface.cpp | 7 +- .../core/src/segcore/SegmentSealedImpl.cpp | 5 +- internal/proto/schema.proto | 1 + internal/proto/schemapb/schema.pb.go | 131 +++++----- internal/proto/segcore.proto | 15 +- internal/proto/segcorepb/segcore.pb.go | 57 +++-- internal/proxy/task.go | 11 +- internal/querynode/collection_replica.go | 5 - internal/querynode/query_collection.go | 69 ++++++ internal/querynode/segment.go | 130 +++++++++- internal/querynode/segment_loader.go | 125 ++++++---- internal/storage/data_codec.go | 129 ++++++++-- internal/storage/data_codec_test.go | 226 +++++++++--------- tests20/python_client/testcases/test_query.py | 1 + 18 files changed, 848 insertions(+), 337 deletions(-) diff --git a/internal/core/src/pb/schema.pb.cc b/internal/core/src/pb/schema.pb.cc index 29923d89ff..462f7f9806 100644 --- a/internal/core/src/pb/schema.pb.cc +++ b/internal/core/src/pb/schema.pb.cc @@ -415,6 +415,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_schema_2eproto::offsets[] PROT PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, field_name_), offsetof(::milvus::proto::schema::FieldDataDefaultTypeInternal, scalars_), offsetof(::milvus::proto::schema::FieldDataDefaultTypeInternal, vectors_), + PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, field_id_), PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, field_), ~0u, // no _has_bits_ PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::IDs, _internal_metadata_), @@ -449,8 +450,8 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB { 64, -1, sizeof(::milvus::proto::schema::ScalarField)}, { 77, -1, sizeof(::milvus::proto::schema::VectorField)}, { 86, -1, sizeof(::milvus::proto::schema::FieldData)}, - { 96, -1, sizeof(::milvus::proto::schema::IDs)}, - { 104, -1, sizeof(::milvus::proto::schema::SearchResultData)}, + { 97, -1, sizeof(::milvus::proto::schema::IDs)}, + { 105, -1, sizeof(::milvus::proto::schema::SearchResultData)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { @@ -499,25 +500,25 @@ const char descriptor_table_protodef_schema_2eproto[] PROTOBUF_SECTION_VARIABLE( "milvus.proto.schema.BytesArrayH\000B\006\n\004data" "\"t\n\013VectorField\022\013\n\003dim\030\001 \001(\003\0227\n\014float_ve" "ctor\030\002 \001(\0132\037.milvus.proto.schema.FloatAr" - "rayH\000\022\027\n\rbinary_vector\030\003 \001(\014H\000B\006\n\004data\"\277" + "rayH\000\022\027\n\rbinary_vector\030\003 \001(\014H\000B\006\n\004data\"\321" "\001\n\tFieldData\022+\n\004type\030\001 \001(\0162\035.milvus.prot" "o.schema.DataType\022\022\n\nfield_name\030\002 \001(\t\0223\n" "\007scalars\030\003 \001(\0132 .milvus.proto.schema.Sca" "larFieldH\000\0223\n\007vectors\030\004 \001(\0132 .milvus.pro" - "to.schema.VectorFieldH\000B\007\n\005field\"w\n\003IDs\022" - "0\n\006int_id\030\001 \001(\0132\036.milvus.proto.schema.Lo" - "ngArrayH\000\0222\n\006str_id\030\002 \001(\0132 .milvus.proto" - ".schema.StringArrayH\000B\n\n\010id_field\"\261\001\n\020Se" - "archResultData\022\023\n\013num_queries\030\001 \001(\003\022\r\n\005t" - "op_k\030\002 \001(\003\0223\n\013fields_data\030\003 \003(\0132\036.milvus" - ".proto.schema.FieldData\022\016\n\006scores\030\004 \003(\002\022" - "%\n\003ids\030\005 \001(\0132\030.milvus.proto.schema.IDs\022\r" - "\n\005topks\030\006 \003(\003*\217\001\n\010DataType\022\010\n\004None\020\000\022\010\n\004" - "Bool\020\001\022\010\n\004Int8\020\002\022\t\n\005Int16\020\003\022\t\n\005Int32\020\004\022\t" - "\n\005Int64\020\005\022\t\n\005Float\020\n\022\n\n\006Double\020\013\022\n\n\006Stri" - "ng\020\024\022\020\n\014BinaryVector\020d\022\017\n\013FloatVector\020eB" - "5Z3github.com/milvus-io/milvus/internal/" - "proto/schemapbb\006proto3" + "to.schema.VectorFieldH\000\022\020\n\010field_id\030\005 \001(" + "\003B\007\n\005field\"w\n\003IDs\0220\n\006int_id\030\001 \001(\0132\036.milv" + "us.proto.schema.LongArrayH\000\0222\n\006str_id\030\002 " + "\001(\0132 .milvus.proto.schema.StringArrayH\000B" + "\n\n\010id_field\"\261\001\n\020SearchResultData\022\023\n\013num_" + "queries\030\001 \001(\003\022\r\n\005top_k\030\002 \001(\003\0223\n\013fields_d" + "ata\030\003 \003(\0132\036.milvus.proto.schema.FieldDat" + "a\022\016\n\006scores\030\004 \003(\002\022%\n\003ids\030\005 \001(\0132\030.milvus." + "proto.schema.IDs\022\r\n\005topks\030\006 \003(\003*\217\001\n\010Data" + "Type\022\010\n\004None\020\000\022\010\n\004Bool\020\001\022\010\n\004Int8\020\002\022\t\n\005In" + "t16\020\003\022\t\n\005Int32\020\004\022\t\n\005Int64\020\005\022\t\n\005Float\020\n\022\n" + "\n\006Double\020\013\022\n\n\006String\020\024\022\020\n\014BinaryVector\020d" + "\022\017\n\013FloatVector\020eB5Z3github.com/milvus-i" + "o/milvus/internal/proto/schemapbb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_schema_2eproto_deps[1] = { &::descriptor_table_common_2eproto, @@ -541,7 +542,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sch static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_schema_2eproto_once; static bool descriptor_table_schema_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_schema_2eproto = { - &descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 1862, + &descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 1880, &descriptor_table_schema_2eproto_once, descriptor_table_schema_2eproto_sccs, descriptor_table_schema_2eproto_deps, 14, 1, schemas, file_default_instances, TableStruct_schema_2eproto::offsets, file_level_metadata_schema_2eproto, 14, file_level_enum_descriptors_schema_2eproto, file_level_service_descriptors_schema_2eproto, @@ -4715,7 +4716,9 @@ FieldData::FieldData(const FieldData& from) if (!from.field_name().empty()) { field_name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.field_name_); } - type_ = from.type_; + ::memcpy(&type_, &from.type_, + static_cast(reinterpret_cast(&field_id_) - + reinterpret_cast(&type_)) + sizeof(field_id_)); clear_has_field(); switch (from.field_case()) { case kScalars: { @@ -4736,7 +4739,9 @@ FieldData::FieldData(const FieldData& from) void FieldData::SharedCtor() { ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_FieldData_schema_2eproto.base); field_name_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - type_ = 0; + ::memset(&type_, 0, static_cast( + reinterpret_cast(&field_id_) - + reinterpret_cast(&type_)) + sizeof(field_id_)); clear_has_field(); } @@ -4787,7 +4792,9 @@ void FieldData::Clear() { (void) cached_has_bits; field_name_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - type_ = 0; + ::memset(&type_, 0, static_cast( + reinterpret_cast(&field_id_) - + reinterpret_cast(&type_)) + sizeof(field_id_)); clear_field(); _internal_metadata_.Clear(); } @@ -4829,6 +4836,13 @@ const char* FieldData::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID:: CHK_(ptr); } else goto handle_unusual; continue; + // int64 field_id = 5; + case 5: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 40)) { + field_id_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); + CHK_(ptr); + } else goto handle_unusual; + continue; default: { handle_unusual: if ((tag & 7) == 4 || tag == 0) { @@ -4910,6 +4924,19 @@ bool FieldData::MergePartialFromCodedStream( break; } + // int64 field_id = 5; + case 5: { + if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (40 & 0xFF)) { + + DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive< + ::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>( + input, &field_id_))); + } else { + goto handle_unusual; + } + break; + } + default: { handle_unusual: if (tag == 0) { @@ -4965,6 +4992,11 @@ void FieldData::SerializeWithCachedSizes( 4, _Internal::vectors(this), output); } + // int64 field_id = 5; + if (this->field_id() != 0) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(5, this->field_id(), output); + } + if (_internal_metadata_.have_unknown_fields()) { ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields( _internal_metadata_.unknown_fields(), output); @@ -5009,6 +5041,11 @@ void FieldData::SerializeWithCachedSizes( 4, _Internal::vectors(this), target); } + // int64 field_id = 5; + if (this->field_id() != 0) { + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(5, this->field_id(), target); + } + if (_internal_metadata_.have_unknown_fields()) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target); @@ -5043,6 +5080,13 @@ size_t FieldData::ByteSizeLong() const { ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::EnumSize(this->type()); } + // int64 field_id = 5; + if (this->field_id() != 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size( + this->field_id()); + } + switch (field_case()) { // .milvus.proto.schema.ScalarField scalars = 3; case kScalars: { @@ -5096,6 +5140,9 @@ void FieldData::MergeFrom(const FieldData& from) { if (from.type() != 0) { set_type(from.type()); } + if (from.field_id() != 0) { + set_field_id(from.field_id()); + } switch (from.field_case()) { case kScalars: { mutable_scalars()->::milvus::proto::schema::ScalarField::MergeFrom(from.scalars()); @@ -5135,6 +5182,7 @@ void FieldData::InternalSwap(FieldData* other) { field_name_.Swap(&other->field_name_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), GetArenaNoVirtual()); swap(type_, other->type_); + swap(field_id_, other->field_id_); swap(field_, other->field_); swap(_oneof_case_[0], other->_oneof_case_[0]); } diff --git a/internal/core/src/pb/schema.pb.h b/internal/core/src/pb/schema.pb.h index 4047708ea9..5b575e9c2e 100644 --- a/internal/core/src/pb/schema.pb.h +++ b/internal/core/src/pb/schema.pb.h @@ -2028,6 +2028,7 @@ class FieldData : enum : int { kFieldNameFieldNumber = 2, kTypeFieldNumber = 1, + kFieldIdFieldNumber = 5, kScalarsFieldNumber = 3, kVectorsFieldNumber = 4, }; @@ -2047,6 +2048,11 @@ class FieldData : ::milvus::proto::schema::DataType type() const; void set_type(::milvus::proto::schema::DataType value); + // int64 field_id = 5; + void clear_field_id(); + ::PROTOBUF_NAMESPACE_ID::int64 field_id() const; + void set_field_id(::PROTOBUF_NAMESPACE_ID::int64 value); + // .milvus.proto.schema.ScalarField scalars = 3; bool has_scalars() const; void clear_scalars(); @@ -2077,6 +2083,7 @@ class FieldData : ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr field_name_; int type_; + ::PROTOBUF_NAMESPACE_ID::int64 field_id_; union FieldUnion { FieldUnion() {} ::milvus::proto::schema::ScalarField* scalars_; @@ -3728,6 +3735,20 @@ inline ::milvus::proto::schema::VectorField* FieldData::mutable_vectors() { return field_.vectors_; } +// int64 field_id = 5; +inline void FieldData::clear_field_id() { + field_id_ = PROTOBUF_LONGLONG(0); +} +inline ::PROTOBUF_NAMESPACE_ID::int64 FieldData::field_id() const { + // @@protoc_insertion_point(field_get:milvus.proto.schema.FieldData.field_id) + return field_id_; +} +inline void FieldData::set_field_id(::PROTOBUF_NAMESPACE_ID::int64 value) { + + field_id_ = value; + // @@protoc_insertion_point(field_set:milvus.proto.schema.FieldData.field_id) +} + inline bool FieldData::has_field() const { return field_case() != FIELD_NOT_SET; } diff --git a/internal/core/src/pb/segcore.pb.cc b/internal/core/src/pb/segcore.pb.cc index 8b94397385..4c87d58694 100644 --- a/internal/core/src/pb/segcore.pb.cc +++ b/internal/core/src/pb/segcore.pb.cc @@ -118,6 +118,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_segcore_2eproto::offsets[] PRO ~0u, // no _oneof_case_ ~0u, // no _weak_field_map_ PROTOBUF_FIELD_OFFSET(::milvus::proto::segcore::RetrieveResults, ids_), + PROTOBUF_FIELD_OFFSET(::milvus::proto::segcore::RetrieveResults, offset_), PROTOBUF_FIELD_OFFSET(::milvus::proto::segcore::RetrieveResults, fields_data_), ~0u, // no _has_bits_ PROTOBUF_FIELD_OFFSET(::milvus::proto::segcore::LoadFieldMeta, _internal_metadata_), @@ -138,8 +139,8 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_segcore_2eproto::offsets[] PRO static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { { 0, -1, sizeof(::milvus::proto::segcore::RetrieveRequest)}, { 7, -1, sizeof(::milvus::proto::segcore::RetrieveResults)}, - { 14, -1, sizeof(::milvus::proto::segcore::LoadFieldMeta)}, - { 22, -1, sizeof(::milvus::proto::segcore::LoadSegmentMeta)}, + { 15, -1, sizeof(::milvus::proto::segcore::LoadFieldMeta)}, + { 23, -1, sizeof(::milvus::proto::segcore::LoadSegmentMeta)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { @@ -153,16 +154,16 @@ const char descriptor_table_protodef_segcore_2eproto[] PROTOBUF_SECTION_VARIABLE "\n\rsegcore.proto\022\024milvus.proto.segcore\032\014s" "chema.proto\"R\n\017RetrieveRequest\022%\n\003ids\030\001 " "\001(\0132\030.milvus.proto.schema.IDs\022\030\n\020output_" - "fields_id\030\002 \003(\003\"m\n\017RetrieveResults\022%\n\003id" - "s\030\001 \001(\0132\030.milvus.proto.schema.IDs\0223\n\013fie" - "lds_data\030\002 \003(\0132\036.milvus.proto.schema.Fie" - "ldData\"P\n\rLoadFieldMeta\022\025\n\rmin_timestamp" - "\030\001 \001(\003\022\025\n\rmax_timestamp\030\002 \001(\003\022\021\n\trow_cou" - "nt\030\003 \001(\003\"Y\n\017LoadSegmentMeta\0222\n\005metas\030\001 \003" - "(\0132#.milvus.proto.segcore.LoadFieldMeta\022" - "\022\n\ntotal_size\030\002 \001(\003B6Z4github.com/milvus" - "-io/milvus/internal/proto/segcorepbb\006pro" - "to3" + "fields_id\030\002 \003(\003\"}\n\017RetrieveResults\022%\n\003id" + "s\030\001 \001(\0132\030.milvus.proto.schema.IDs\022\016\n\006off" + "set\030\002 \003(\003\0223\n\013fields_data\030\003 \003(\0132\036.milvus." + "proto.schema.FieldData\"P\n\rLoadFieldMeta\022" + "\025\n\rmin_timestamp\030\001 \001(\003\022\025\n\rmax_timestamp\030" + "\002 \001(\003\022\021\n\trow_count\030\003 \001(\003\"Y\n\017LoadSegmentM" + "eta\0222\n\005metas\030\001 \003(\0132#.milvus.proto.segcor" + "e.LoadFieldMeta\022\022\n\ntotal_size\030\002 \001(\003B6Z4g" + "ithub.com/milvus-io/milvus/internal/prot" + "o/segcorepbb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_segcore_2eproto_deps[1] = { &::descriptor_table_schema_2eproto, @@ -176,7 +177,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_seg static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_segcore_2eproto_once; static bool descriptor_table_segcore_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_segcore_2eproto = { - &descriptor_table_segcore_2eproto_initialized, descriptor_table_protodef_segcore_2eproto, "segcore.proto", 483, + &descriptor_table_segcore_2eproto_initialized, descriptor_table_protodef_segcore_2eproto, "segcore.proto", 499, &descriptor_table_segcore_2eproto_once, descriptor_table_segcore_2eproto_sccs, descriptor_table_segcore_2eproto_deps, 4, 1, schemas, file_default_instances, TableStruct_segcore_2eproto::offsets, file_level_metadata_segcore_2eproto, 4, file_level_enum_descriptors_segcore_2eproto, file_level_service_descriptors_segcore_2eproto, @@ -561,6 +562,7 @@ RetrieveResults::RetrieveResults() RetrieveResults::RetrieveResults(const RetrieveResults& from) : ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr), + offset_(from.offset_), fields_data_(from.fields_data_) { _internal_metadata_.MergeFrom(from._internal_metadata_); if (from.has_ids()) { @@ -600,6 +602,7 @@ void RetrieveResults::Clear() { // Prevent compiler warnings about cached_has_bits being unused (void) cached_has_bits; + offset_.Clear(); fields_data_.Clear(); if (GetArenaNoVirtual() == nullptr && ids_ != nullptr) { delete ids_; @@ -623,16 +626,26 @@ const char* RetrieveResults::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPAC CHK_(ptr); } else goto handle_unusual; continue; - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; case 2: if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) { + ptr = ::PROTOBUF_NAMESPACE_ID::internal::PackedInt64Parser(mutable_offset(), ptr, ctx); + CHK_(ptr); + } else if (static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16) { + add_offset(::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr)); + CHK_(ptr); + } else goto handle_unusual; + continue; + // repeated .milvus.proto.schema.FieldData fields_data = 3; + case 3: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 26)) { ptr -= 1; do { ptr += 1; ptr = ctx->ParseMessage(add_fields_data(), ptr); CHK_(ptr); if (!ctx->DataAvailable(ptr)) break; - } while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 18); + } while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 26); } else goto handle_unusual; continue; default: { @@ -676,9 +689,25 @@ bool RetrieveResults::MergePartialFromCodedStream( break; } - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; case 2: { if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (18 & 0xFF)) { + DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPackedPrimitive< + ::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>( + input, this->mutable_offset()))); + } else if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (16 & 0xFF)) { + DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadRepeatedPrimitiveNoInline< + ::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>( + 1, 18u, input, this->mutable_offset()))); + } else { + goto handle_unusual; + } + break; + } + + // repeated .milvus.proto.schema.FieldData fields_data = 3; + case 3: { + if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (26 & 0xFF)) { DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage( input, add_fields_data())); } else { @@ -720,11 +749,22 @@ void RetrieveResults::SerializeWithCachedSizes( 1, _Internal::ids(this), output); } - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; + if (this->offset_size() > 0) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteTag(2, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED, output); + output->WriteVarint32(_offset_cached_byte_size_.load( + std::memory_order_relaxed)); + } + for (int i = 0, n = this->offset_size(); i < n; i++) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64NoTag( + this->offset(i), output); + } + + // repeated .milvus.proto.schema.FieldData fields_data = 3; for (unsigned int i = 0, n = static_cast(this->fields_data_size()); i < n; i++) { ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray( - 2, + 3, this->fields_data(static_cast(i)), output); } @@ -749,12 +789,25 @@ void RetrieveResults::SerializeWithCachedSizes( 1, _Internal::ids(this), target); } - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; + if (this->offset_size() > 0) { + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteTagToArray( + 2, + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED, + target); + target = ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream::WriteVarint32ToArray( + _offset_cached_byte_size_.load(std::memory_order_relaxed), + target); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: + WriteInt64NoTagToArray(this->offset_, target); + } + + // repeated .milvus.proto.schema.FieldData fields_data = 3; for (unsigned int i = 0, n = static_cast(this->fields_data_size()); i < n; i++) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: InternalWriteMessageToArray( - 2, this->fields_data(static_cast(i)), target); + 3, this->fields_data(static_cast(i)), target); } if (_internal_metadata_.have_unknown_fields()) { @@ -778,7 +831,22 @@ size_t RetrieveResults::ByteSizeLong() const { // Prevent compiler warnings about cached_has_bits being unused (void) cached_has_bits; - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; + { + size_t data_size = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: + Int64Size(this->offset_); + if (data_size > 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int32Size( + static_cast<::PROTOBUF_NAMESPACE_ID::int32>(data_size)); + } + int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(data_size); + _offset_cached_byte_size_.store(cached_size, + std::memory_order_relaxed); + total_size += data_size; + } + + // repeated .milvus.proto.schema.FieldData fields_data = 3; { unsigned int count = static_cast(this->fields_data_size()); total_size += 1UL * count; @@ -823,6 +891,7 @@ void RetrieveResults::MergeFrom(const RetrieveResults& from) { ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; (void) cached_has_bits; + offset_.MergeFrom(from.offset_); fields_data_.MergeFrom(from.fields_data_); if (from.has_ids()) { mutable_ids()->::milvus::proto::schema::IDs::MergeFrom(from.ids()); @@ -850,6 +919,7 @@ bool RetrieveResults::IsInitialized() const { void RetrieveResults::InternalSwap(RetrieveResults* other) { using std::swap; _internal_metadata_.Swap(&other->_internal_metadata_); + offset_.InternalSwap(&other->offset_); CastToBase(&fields_data_)->InternalSwap(CastToBase(&other->fields_data_)); swap(ids_, other->ids_); } diff --git a/internal/core/src/pb/segcore.pb.h b/internal/core/src/pb/segcore.pb.h index ef5d749bb6..a172b04686 100644 --- a/internal/core/src/pb/segcore.pb.h +++ b/internal/core/src/pb/segcore.pb.h @@ -346,10 +346,22 @@ class RetrieveResults : // accessors ------------------------------------------------------- enum : int { - kFieldsDataFieldNumber = 2, + kOffsetFieldNumber = 2, + kFieldsDataFieldNumber = 3, kIdsFieldNumber = 1, }; - // repeated .milvus.proto.schema.FieldData fields_data = 2; + // repeated int64 offset = 2; + int offset_size() const; + void clear_offset(); + ::PROTOBUF_NAMESPACE_ID::int64 offset(int index) const; + void set_offset(int index, ::PROTOBUF_NAMESPACE_ID::int64 value); + void add_offset(::PROTOBUF_NAMESPACE_ID::int64 value); + const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >& + offset() const; + ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >* + mutable_offset(); + + // repeated .milvus.proto.schema.FieldData fields_data = 3; int fields_data_size() const; void clear_fields_data(); ::milvus::proto::schema::FieldData* mutable_fields_data(int index); @@ -373,6 +385,8 @@ class RetrieveResults : class _Internal; ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; + ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 > offset_; + mutable std::atomic _offset_cached_byte_size_; ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData > fields_data_; ::milvus::proto::schema::IDs* ids_; mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; @@ -802,7 +816,37 @@ inline void RetrieveResults::set_allocated_ids(::milvus::proto::schema::IDs* ids // @@protoc_insertion_point(field_set_allocated:milvus.proto.segcore.RetrieveResults.ids) } -// repeated .milvus.proto.schema.FieldData fields_data = 2; +// repeated int64 offset = 2; +inline int RetrieveResults::offset_size() const { + return offset_.size(); +} +inline void RetrieveResults::clear_offset() { + offset_.Clear(); +} +inline ::PROTOBUF_NAMESPACE_ID::int64 RetrieveResults::offset(int index) const { + // @@protoc_insertion_point(field_get:milvus.proto.segcore.RetrieveResults.offset) + return offset_.Get(index); +} +inline void RetrieveResults::set_offset(int index, ::PROTOBUF_NAMESPACE_ID::int64 value) { + offset_.Set(index, value); + // @@protoc_insertion_point(field_set:milvus.proto.segcore.RetrieveResults.offset) +} +inline void RetrieveResults::add_offset(::PROTOBUF_NAMESPACE_ID::int64 value) { + offset_.Add(value); + // @@protoc_insertion_point(field_add:milvus.proto.segcore.RetrieveResults.offset) +} +inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >& +RetrieveResults::offset() const { + // @@protoc_insertion_point(field_list:milvus.proto.segcore.RetrieveResults.offset) + return offset_; +} +inline ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >* +RetrieveResults::mutable_offset() { + // @@protoc_insertion_point(field_mutable_list:milvus.proto.segcore.RetrieveResults.offset) + return &offset_; +} + +// repeated .milvus.proto.schema.FieldData fields_data = 3; inline int RetrieveResults::fields_data_size() const { return fields_data_.size(); } diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 9be3d69f54..2dbb654a70 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -141,6 +141,7 @@ static std::unique_ptr CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); + data_array->set_field_id(field_meta.get_id().get()); if (!datatype_is_vector(data_type)) { auto scalar_array = CreateScalarArrayFrom(data_raw, count, data_type); @@ -166,7 +167,7 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_ break; } default: { - PanicInfo("unsupportted datatype"); + PanicInfo("unsupported datatype"); } } } @@ -204,6 +205,10 @@ SegmentInternalInterface::GetEntityById(const std::vector& field_of results->set_allocated_ids(ids_.release()); + for (auto& seg_offset : seg_offsets) { + results->add_offset(seg_offset.get()); + } + auto fields_data = results->mutable_fields_data(); for (auto field_offset : field_offsets) { auto col = BulkSubScript(field_offset, seg_offsets.data(), seg_offsets.size()); diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index b72c7aa640..44b6921c61 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -374,7 +374,10 @@ SegmentSealedImpl::bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const { - Assert(get_bit(field_data_ready_bitset_, field_offset)); + // Assert(get_bit(field_data_ready_bitset_, field_offset)); + if (!get_bit(field_data_ready_bitset_, field_offset)) { + return; + } auto& field_meta = schema_->operator[](field_offset); auto src_vec = field_datas_[field_offset.get()].data(); switch (field_meta.get_data_type()) { diff --git a/internal/proto/schema.proto b/internal/proto/schema.proto index 205838c74c..c72d64d6a6 100644 --- a/internal/proto/schema.proto +++ b/internal/proto/schema.proto @@ -105,6 +105,7 @@ message FieldData { ScalarField scalars = 3; VectorField vectors = 4; } + int64 field_id = 5; } message IDs { diff --git a/internal/proto/schemapb/schema.pb.go b/internal/proto/schemapb/schema.pb.go index 03ac3c97a4..3d63e8c9df 100644 --- a/internal/proto/schemapb/schema.pb.go +++ b/internal/proto/schemapb/schema.pb.go @@ -761,6 +761,7 @@ type FieldData struct { // *FieldData_Scalars // *FieldData_Vectors Field isFieldData_Field `protobuf_oneof:"field"` + FieldId int64 `protobuf:"varint,5,opt,name=field_id,json=fieldId,proto3" json:"field_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -842,6 +843,13 @@ func (m *FieldData) GetVectors() *VectorField { return nil } +func (m *FieldData) GetFieldId() int64 { + if m != nil { + return m.FieldId + } + return 0 +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*FieldData) XXX_OneofWrappers() []interface{} { return []interface{}{ @@ -1030,65 +1038,66 @@ func init() { func init() { proto.RegisterFile("schema.proto", fileDescriptor_1c5fb4d8cc22d66a) } var fileDescriptor_1c5fb4d8cc22d66a = []byte{ - // 951 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x5d, 0x8f, 0xdb, 0x44, - 0x14, 0x8d, 0xe3, 0x7c, 0xd8, 0xd7, 0xa1, 0x58, 0xd3, 0x0a, 0x59, 0x48, 0xed, 0xa6, 0x11, 0x48, - 0x51, 0x25, 0x76, 0xd5, 0x5d, 0x28, 0xa5, 0xa2, 0x02, 0xd2, 0x68, 0xb5, 0xd1, 0xa2, 0x6a, 0xf1, - 0xa2, 0x3e, 0xf0, 0x12, 0x4d, 0xe2, 0xe9, 0xee, 0x68, 0xed, 0x99, 0xe0, 0x19, 0x57, 0xe4, 0x07, - 0xf0, 0xcc, 0x0b, 0x4f, 0xfc, 0x37, 0x1e, 0x80, 0x3f, 0x82, 0xe6, 0xce, 0x24, 0x31, 0xc4, 0x5d, - 0xed, 0xdb, 0x9d, 0xf1, 0x3d, 0x67, 0xe6, 0x9e, 0x7b, 0xe6, 0x1a, 0x06, 0x6a, 0x79, 0xcd, 0x0a, - 0x7a, 0xb8, 0x2a, 0xa5, 0x96, 0xe4, 0x7e, 0xc1, 0xf3, 0x77, 0x95, 0xb2, 0xab, 0x43, 0xfb, 0xe9, - 0xe3, 0xc1, 0x52, 0x16, 0x85, 0x14, 0x76, 0x73, 0xf4, 0x57, 0x1b, 0xa2, 0x53, 0xce, 0xf2, 0xec, - 0x12, 0xbf, 0x92, 0x04, 0xfa, 0x6f, 0xcd, 0x72, 0x36, 0x4d, 0xbc, 0xa1, 0x37, 0xf6, 0xd3, 0xcd, - 0x92, 0x10, 0xe8, 0x08, 0x5a, 0xb0, 0xa4, 0x3d, 0xf4, 0xc6, 0x61, 0x8a, 0x31, 0xf9, 0x04, 0xee, - 0x71, 0x35, 0x5f, 0x95, 0xbc, 0xa0, 0xe5, 0x7a, 0x7e, 0xc3, 0xd6, 0x89, 0x3f, 0xf4, 0xc6, 0x41, - 0x3a, 0xe0, 0xea, 0xc2, 0x6e, 0x9e, 0xb3, 0x35, 0x19, 0x42, 0x94, 0x31, 0xb5, 0x2c, 0xf9, 0x4a, - 0x73, 0x29, 0x92, 0x0e, 0x12, 0xd4, 0xb7, 0xc8, 0x0b, 0x08, 0x33, 0xaa, 0xe9, 0x5c, 0xaf, 0x57, - 0x2c, 0xe9, 0x0e, 0xbd, 0xf1, 0xbd, 0xe3, 0x87, 0x87, 0x0d, 0x97, 0x3f, 0x9c, 0x52, 0x4d, 0x7f, - 0x5c, 0xaf, 0x58, 0x1a, 0x64, 0x2e, 0x22, 0x13, 0x88, 0x0c, 0x6c, 0xbe, 0xa2, 0x25, 0x2d, 0x54, - 0xd2, 0x1b, 0xfa, 0xe3, 0xe8, 0xf8, 0xf1, 0x7f, 0xd1, 0xae, 0xe4, 0x73, 0xb6, 0x7e, 0x43, 0xf3, - 0x8a, 0x5d, 0x50, 0x5e, 0xa6, 0x60, 0x50, 0x17, 0x08, 0x22, 0x53, 0x18, 0x70, 0x91, 0xb1, 0x5f, - 0x36, 0x24, 0xfd, 0xbb, 0x92, 0x44, 0x08, 0x73, 0x2c, 0x1f, 0x41, 0x8f, 0x56, 0x5a, 0xce, 0xa6, - 0x49, 0x80, 0x2a, 0xb8, 0xd5, 0xe8, 0x0f, 0x0f, 0xe2, 0x57, 0x32, 0xcf, 0xd9, 0xd2, 0x14, 0xeb, - 0x84, 0xde, 0xc8, 0xe9, 0xd5, 0xe4, 0xfc, 0x9f, 0x50, 0xed, 0x7d, 0xa1, 0x76, 0x47, 0xf8, 0xf5, - 0x23, 0xc8, 0x73, 0xe8, 0x61, 0x9f, 0x54, 0xd2, 0xc1, 0xab, 0x0f, 0x1b, 0xd5, 0xab, 0x35, 0x3a, - 0x75, 0xf9, 0xa3, 0x03, 0x08, 0x27, 0x52, 0xe6, 0xdf, 0x95, 0x25, 0x5d, 0x9b, 0x4b, 0x19, 0x5d, - 0x13, 0x6f, 0xe8, 0x8f, 0x83, 0x14, 0xe3, 0xd1, 0x23, 0x08, 0x66, 0x42, 0xef, 0x7f, 0xef, 0xba, - 0xef, 0x07, 0x10, 0x7e, 0x2f, 0xc5, 0xd5, 0x7e, 0x82, 0xef, 0x12, 0x86, 0x00, 0xa7, 0xb9, 0xa4, - 0x0d, 0x14, 0x6d, 0x97, 0xf1, 0x18, 0xa2, 0xa9, 0xac, 0x16, 0x39, 0xdb, 0x4f, 0xf1, 0x76, 0x24, - 0x93, 0xb5, 0x66, 0x6a, 0x3f, 0x63, 0xb0, 0x23, 0xb9, 0xd4, 0x25, 0x6f, 0xba, 0x49, 0xe8, 0x52, - 0xfe, 0xf4, 0x21, 0xba, 0x5c, 0xd2, 0x9c, 0x96, 0xa8, 0x04, 0x79, 0x09, 0xe1, 0x42, 0xca, 0x7c, - 0xee, 0x12, 0xbd, 0x71, 0x74, 0xfc, 0xa8, 0x51, 0xb8, 0xad, 0x42, 0x67, 0xad, 0x34, 0x30, 0x10, - 0xe3, 0x43, 0xf2, 0x02, 0x02, 0x2e, 0xb4, 0x45, 0xb7, 0x11, 0xdd, 0x6c, 0xda, 0x8d, 0x7c, 0x67, - 0xad, 0xb4, 0xcf, 0x85, 0x46, 0xec, 0x4b, 0x08, 0x73, 0x29, 0xae, 0x2c, 0xd8, 0xbf, 0xe5, 0xe8, - 0xad, 0xb6, 0xe6, 0x68, 0x03, 0x41, 0xf8, 0xb7, 0x00, 0x6f, 0x8d, 0xa6, 0x16, 0xdf, 0x41, 0xfc, - 0x41, 0x73, 0xcf, 0xb7, 0xd2, 0x9f, 0xb5, 0xd2, 0x10, 0x41, 0xc8, 0xf0, 0x0a, 0xa2, 0x0c, 0x35, - 0xb7, 0x14, 0x5d, 0xa4, 0x68, 0xb6, 0x4d, 0xad, 0x37, 0x67, 0xad, 0x14, 0x2c, 0x6c, 0x43, 0xa2, - 0x50, 0x73, 0x4b, 0xd2, 0xbb, 0x85, 0xa4, 0xd6, 0x1b, 0x43, 0x62, 0x61, 0x9b, 0x5a, 0x16, 0xa6, - 0xb5, 0x96, 0xa3, 0x7f, 0x4b, 0x2d, 0x3b, 0x07, 0x98, 0x5a, 0x10, 0x64, 0x18, 0x26, 0x3d, 0xdb, - 0xeb, 0xd1, 0xef, 0x1e, 0x44, 0x6f, 0xd8, 0x52, 0x4b, 0xd7, 0xdf, 0x18, 0xfc, 0x8c, 0x17, 0x6e, - 0x90, 0x99, 0xd0, 0x3c, 0x74, 0xab, 0xdb, 0x3b, 0x4c, 0x73, 0x6d, 0xbb, 0x83, 0x72, 0x11, 0xc2, - 0x2c, 0x39, 0xf9, 0x14, 0x3e, 0x58, 0x70, 0x61, 0x46, 0x9e, 0xa3, 0x31, 0x0d, 0x1c, 0x9c, 0xb5, - 0xd2, 0x81, 0xdd, 0xb6, 0x69, 0xdb, 0x6b, 0xfd, 0xed, 0x41, 0x88, 0x17, 0xc2, 0x72, 0x9f, 0x42, - 0x07, 0xc7, 0x9c, 0x77, 0x97, 0x31, 0x87, 0xa9, 0xe4, 0x21, 0x00, 0xbe, 0xd6, 0x79, 0x6d, 0x00, - 0x87, 0xb8, 0xf3, 0xda, 0x8c, 0x8d, 0xaf, 0xa1, 0xaf, 0xd0, 0xd5, 0xca, 0x39, 0xe9, 0x3d, 0x1d, - 0xd8, 0x39, 0xdf, 0x38, 0xd1, 0x41, 0x0c, 0xda, 0x56, 0xa1, 0x9c, 0x8f, 0x9a, 0xd1, 0x35, 0x5d, - 0x0d, 0xda, 0x41, 0x26, 0x7d, 0xe8, 0xe2, 0x45, 0x46, 0xbf, 0x7a, 0xe0, 0xcf, 0xa6, 0x8a, 0x7c, - 0x09, 0x3d, 0xf3, 0x28, 0x78, 0x76, 0xeb, 0x83, 0xaa, 0xbb, 0xba, 0xcb, 0x85, 0x9e, 0x65, 0xe4, - 0x2b, 0xe8, 0x29, 0x5d, 0x1a, 0x60, 0xfb, 0xce, 0x36, 0xea, 0x2a, 0x5d, 0xce, 0xb2, 0x09, 0x40, - 0xc0, 0xb3, 0xb9, 0xbd, 0xc7, 0x3f, 0x1e, 0xc4, 0x97, 0x8c, 0x96, 0xcb, 0xeb, 0x94, 0xa9, 0x2a, - 0xb7, 0x66, 0x3f, 0x80, 0x48, 0x54, 0xc5, 0xfc, 0xe7, 0x8a, 0x95, 0x9c, 0x29, 0x67, 0x08, 0x10, - 0x55, 0xf1, 0x83, 0xdd, 0x21, 0xf7, 0xa1, 0xab, 0xe5, 0x6a, 0x7e, 0x83, 0x67, 0xfb, 0x69, 0x47, - 0xcb, 0xd5, 0x39, 0xf9, 0x06, 0x22, 0x3b, 0x24, 0x37, 0xaf, 0xd4, 0x7f, 0x6f, 0x3d, 0xdb, 0xf6, - 0xa6, 0xb6, 0x53, 0xe8, 0x4b, 0x33, 0xad, 0xd5, 0x52, 0x96, 0xcc, 0x4e, 0xe5, 0x76, 0xea, 0x56, - 0xe4, 0x09, 0xf8, 0x3c, 0x53, 0xee, 0xcd, 0x25, 0xcd, 0x33, 0x63, 0xaa, 0x52, 0x93, 0x44, 0x1e, - 0xe0, 0xcd, 0x6e, 0xec, 0x8f, 0xcd, 0x4f, 0xed, 0xe2, 0xc9, 0x6f, 0x1e, 0x04, 0x1b, 0x93, 0x90, - 0x00, 0x3a, 0xaf, 0xa5, 0x60, 0x71, 0xcb, 0x44, 0x66, 0x54, 0xc5, 0x9e, 0x89, 0x66, 0x42, 0x3f, - 0x8f, 0xdb, 0x24, 0x84, 0xee, 0x4c, 0xe8, 0xa7, 0xcf, 0x62, 0xdf, 0x85, 0x27, 0xc7, 0x71, 0xc7, - 0x85, 0xcf, 0x3e, 0x8f, 0xbb, 0x26, 0x44, 0xab, 0xc7, 0x40, 0x00, 0x7a, 0xf6, 0xb1, 0xc7, 0x91, - 0x89, 0xad, 0xd8, 0xf1, 0x03, 0x12, 0xc3, 0x60, 0x52, 0x73, 0x76, 0x9c, 0x91, 0x0f, 0x21, 0x3a, - 0xdd, 0xbd, 0x88, 0x98, 0x4d, 0xbe, 0xf8, 0xe9, 0xe4, 0x8a, 0xeb, 0xeb, 0x6a, 0x61, 0xfe, 0x93, - 0x47, 0xb6, 0xa4, 0xcf, 0xb8, 0x74, 0xd1, 0x11, 0x17, 0x9a, 0x95, 0x82, 0xe6, 0x47, 0x58, 0xe5, - 0x91, 0xad, 0x72, 0xb5, 0x58, 0xf4, 0x70, 0x7d, 0xf2, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xca, - 0x27, 0x8a, 0x41, 0xb9, 0x08, 0x00, 0x00, + // 963 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xdd, 0x8e, 0xdb, 0x44, + 0x14, 0x8e, 0xe3, 0xfc, 0xd8, 0xc7, 0xa1, 0x58, 0xd3, 0x0a, 0x19, 0xa4, 0x76, 0xd3, 0x08, 0xa4, + 0xa8, 0x12, 0xbb, 0xea, 0x2e, 0x94, 0x52, 0x51, 0x01, 0x69, 0xb4, 0x4a, 0xb4, 0xa8, 0x5a, 0xbc, + 0xa8, 0x17, 0xdc, 0x44, 0x93, 0x78, 0xba, 0x3b, 0x5a, 0x7b, 0x26, 0x78, 0x26, 0x15, 0x79, 0x00, + 0xae, 0xb9, 0xe1, 0x8a, 0x77, 0xe3, 0x02, 0xf1, 0x1c, 0x48, 0x68, 0xce, 0x4c, 0x12, 0xd3, 0x78, + 0x57, 0x7b, 0x77, 0x66, 0x7c, 0xbe, 0x6f, 0xce, 0xf9, 0xce, 0x8f, 0xa1, 0xa7, 0x16, 0x57, 0xac, + 0xa0, 0x87, 0xcb, 0x52, 0x6a, 0x49, 0xee, 0x17, 0x3c, 0x7f, 0xb7, 0x52, 0xf6, 0x74, 0x68, 0x3f, + 0x7d, 0xd2, 0x5b, 0xc8, 0xa2, 0x90, 0xc2, 0x5e, 0x0e, 0xfe, 0x6e, 0x42, 0x74, 0xca, 0x59, 0x9e, + 0x5d, 0xe0, 0x57, 0x92, 0x40, 0xf7, 0xad, 0x39, 0x4e, 0xc7, 0x89, 0xd7, 0xf7, 0x86, 0x7e, 0xba, + 0x39, 0x12, 0x02, 0x2d, 0x41, 0x0b, 0x96, 0x34, 0xfb, 0xde, 0x30, 0x4c, 0xd1, 0x26, 0x9f, 0xc2, + 0x3d, 0xae, 0x66, 0xcb, 0x92, 0x17, 0xb4, 0x5c, 0xcf, 0xae, 0xd9, 0x3a, 0xf1, 0xfb, 0xde, 0x30, + 0x48, 0x7b, 0x5c, 0x9d, 0xdb, 0xcb, 0x33, 0xb6, 0x26, 0x7d, 0x88, 0x32, 0xa6, 0x16, 0x25, 0x5f, + 0x6a, 0x2e, 0x45, 0xd2, 0x42, 0x82, 0xea, 0x15, 0x79, 0x01, 0x61, 0x46, 0x35, 0x9d, 0xe9, 0xf5, + 0x92, 0x25, 0xed, 0xbe, 0x37, 0xbc, 0x77, 0xfc, 0xf0, 0xb0, 0x26, 0xf8, 0xc3, 0x31, 0xd5, 0xf4, + 0xa7, 0xf5, 0x92, 0xa5, 0x41, 0xe6, 0x2c, 0x32, 0x82, 0xc8, 0xc0, 0x66, 0x4b, 0x5a, 0xd2, 0x42, + 0x25, 0x9d, 0xbe, 0x3f, 0x8c, 0x8e, 0x1f, 0xff, 0x1f, 0xed, 0x52, 0x3e, 0x63, 0xeb, 0x37, 0x34, + 0x5f, 0xb1, 0x73, 0xca, 0xcb, 0x14, 0x0c, 0xea, 0x1c, 0x41, 0x64, 0x0c, 0x3d, 0x2e, 0x32, 0xf6, + 0xeb, 0x86, 0xa4, 0x7b, 0x57, 0x92, 0x08, 0x61, 0x8e, 0xe5, 0x23, 0xe8, 0xd0, 0x95, 0x96, 0xd3, + 0x71, 0x12, 0xa0, 0x0a, 0xee, 0x34, 0xf8, 0xd3, 0x83, 0xf8, 0x95, 0xcc, 0x73, 0xb6, 0x30, 0xc9, + 0x3a, 0xa1, 0x37, 0x72, 0x7a, 0x15, 0x39, 0xdf, 0x13, 0xaa, 0xb9, 0x2f, 0xd4, 0xee, 0x09, 0xbf, + 0xfa, 0x04, 0x79, 0x0e, 0x1d, 0xac, 0x93, 0x4a, 0x5a, 0x18, 0x7a, 0xbf, 0x56, 0xbd, 0x4a, 0xa1, + 0x53, 0xe7, 0x3f, 0x38, 0x80, 0x70, 0x24, 0x65, 0xfe, 0x7d, 0x59, 0xd2, 0xb5, 0x09, 0xca, 0xe8, + 0x9a, 0x78, 0x7d, 0x7f, 0x18, 0xa4, 0x68, 0x0f, 0x1e, 0x41, 0x30, 0x15, 0x7a, 0xff, 0x7b, 0xdb, + 0x7d, 0x3f, 0x80, 0xf0, 0x07, 0x29, 0x2e, 0xf7, 0x1d, 0x7c, 0xe7, 0xd0, 0x07, 0x38, 0xcd, 0x25, + 0xad, 0xa1, 0x68, 0x3a, 0x8f, 0xc7, 0x10, 0x8d, 0xe5, 0x6a, 0x9e, 0xb3, 0x7d, 0x17, 0x6f, 0x47, + 0x32, 0x5a, 0x6b, 0xa6, 0xf6, 0x3d, 0x7a, 0x3b, 0x92, 0x0b, 0x5d, 0xf2, 0xba, 0x48, 0x42, 0xe7, + 0xf2, 0x97, 0x0f, 0xd1, 0xc5, 0x82, 0xe6, 0xb4, 0x44, 0x25, 0xc8, 0x4b, 0x08, 0xe7, 0x52, 0xe6, + 0x33, 0xe7, 0xe8, 0x0d, 0xa3, 0xe3, 0x47, 0xb5, 0xc2, 0x6d, 0x15, 0x9a, 0x34, 0xd2, 0xc0, 0x40, + 0x4c, 0x1f, 0x92, 0x17, 0x10, 0x70, 0xa1, 0x2d, 0xba, 0x89, 0xe8, 0xfa, 0xa6, 0xdd, 0xc8, 0x37, + 0x69, 0xa4, 0x5d, 0x2e, 0x34, 0x62, 0x5f, 0x42, 0x98, 0x4b, 0x71, 0x69, 0xc1, 0xfe, 0x2d, 0x4f, + 0x6f, 0xb5, 0x35, 0x4f, 0x1b, 0x08, 0xc2, 0xbf, 0x03, 0x78, 0x6b, 0x34, 0xb5, 0xf8, 0x16, 0xe2, + 0x0f, 0xea, 0x6b, 0xbe, 0x95, 0x7e, 0xd2, 0x48, 0x43, 0x04, 0x21, 0xc3, 0x2b, 0x88, 0x32, 0xd4, + 0xdc, 0x52, 0xb4, 0x91, 0xa2, 0xbe, 0x6d, 0x2a, 0xb5, 0x99, 0x34, 0x52, 0xb0, 0xb0, 0x0d, 0x89, + 0x42, 0xcd, 0x2d, 0x49, 0xe7, 0x16, 0x92, 0x4a, 0x6d, 0x0c, 0x89, 0x85, 0x6d, 0x72, 0x99, 0x9b, + 0xd2, 0x5a, 0x8e, 0xee, 0x2d, 0xb9, 0xec, 0x3a, 0xc0, 0xe4, 0x82, 0x20, 0xc3, 0x30, 0xea, 0xd8, + 0x5a, 0x0f, 0xfe, 0xf0, 0x20, 0x7a, 0xc3, 0x16, 0x5a, 0xba, 0xfa, 0xc6, 0xe0, 0x67, 0xbc, 0x70, + 0x8b, 0xcc, 0x98, 0x66, 0xd0, 0xad, 0x6e, 0xef, 0xd0, 0xcd, 0x95, 0xed, 0x0e, 0xca, 0x45, 0x08, + 0xb3, 0xe4, 0xe4, 0x33, 0xf8, 0x60, 0xce, 0x85, 0x59, 0x79, 0x8e, 0xc6, 0x14, 0xb0, 0x37, 0x69, + 0xa4, 0x3d, 0x7b, 0x6d, 0xdd, 0xb6, 0x61, 0xfd, 0xeb, 0x41, 0x88, 0x01, 0x61, 0xba, 0x4f, 0xa1, + 0x85, 0x6b, 0xce, 0xbb, 0xcb, 0x9a, 0x43, 0x57, 0xf2, 0x10, 0x00, 0xa7, 0x75, 0x56, 0x59, 0xc0, + 0x21, 0xde, 0xbc, 0x36, 0x6b, 0xe3, 0x1b, 0xe8, 0x2a, 0xec, 0x6a, 0xe5, 0x3a, 0xe9, 0x86, 0x0a, + 0xec, 0x3a, 0xdf, 0x74, 0xa2, 0x83, 0x18, 0xb4, 0xcd, 0x42, 0xb9, 0x3e, 0xaa, 0x47, 0x57, 0x74, + 0x35, 0x68, 0x07, 0x21, 0x1f, 0x43, 0x60, 0x43, 0xe3, 0x19, 0xf6, 0xd0, 0xf6, 0x87, 0x91, 0x8d, + 0xba, 0xd0, 0x46, 0x73, 0xf0, 0x9b, 0x07, 0xfe, 0x74, 0xac, 0xc8, 0x57, 0xd0, 0x31, 0xf3, 0xc2, + 0xb3, 0x5b, 0x67, 0xad, 0xda, 0xf0, 0x6d, 0x2e, 0xf4, 0x34, 0x23, 0x5f, 0x43, 0x47, 0xe9, 0xd2, + 0x00, 0x9b, 0x77, 0xee, 0xb0, 0xb6, 0xd2, 0xe5, 0x34, 0x1b, 0x01, 0x04, 0x3c, 0x9b, 0xd9, 0x38, + 0xfe, 0xf1, 0x20, 0xbe, 0x60, 0xb4, 0x5c, 0x5c, 0xa5, 0x4c, 0xad, 0x72, 0x3b, 0x07, 0x07, 0x10, + 0x89, 0x55, 0x31, 0xfb, 0x65, 0xc5, 0x4a, 0xce, 0x94, 0xeb, 0x15, 0x10, 0xab, 0xe2, 0x47, 0x7b, + 0x43, 0xee, 0x43, 0x5b, 0xcb, 0xe5, 0xec, 0x1a, 0xdf, 0xf6, 0xd3, 0x96, 0x96, 0xcb, 0x33, 0xf2, + 0x2d, 0x44, 0x76, 0x7f, 0x6e, 0x06, 0xd8, 0xbf, 0x31, 0x9f, 0x6d, 0xe5, 0x53, 0x5b, 0x44, 0x6c, + 0x59, 0xb3, 0xc8, 0xd5, 0x42, 0x96, 0xcc, 0x2e, 0xec, 0x66, 0xea, 0x4e, 0xe4, 0x09, 0xf8, 0x3c, + 0x53, 0x6e, 0x1c, 0x93, 0xfa, 0x75, 0x32, 0x56, 0xa9, 0x71, 0x22, 0x0f, 0x30, 0xb2, 0x6b, 0xfb, + 0xcf, 0xf3, 0x53, 0x7b, 0x78, 0xf2, 0xbb, 0x07, 0xc1, 0xa6, 0x7f, 0x48, 0x00, 0xad, 0xd7, 0x52, + 0xb0, 0xb8, 0x61, 0x2c, 0xb3, 0xc5, 0x62, 0xcf, 0x58, 0x53, 0xa1, 0x9f, 0xc7, 0x4d, 0x12, 0x42, + 0x7b, 0x2a, 0xf4, 0xd3, 0x67, 0xb1, 0xef, 0xcc, 0x93, 0xe3, 0xb8, 0xe5, 0xcc, 0x67, 0x5f, 0xc4, + 0x6d, 0x63, 0xe2, 0x14, 0xc4, 0x40, 0x00, 0x3a, 0x76, 0x0f, 0xc4, 0x91, 0xb1, 0xad, 0xd8, 0xf1, + 0x03, 0x12, 0x43, 0x6f, 0x54, 0x69, 0xfa, 0x38, 0x23, 0x1f, 0x42, 0x74, 0xba, 0x1b, 0x96, 0x98, + 0x8d, 0xbe, 0xfc, 0xf9, 0xe4, 0x92, 0xeb, 0xab, 0xd5, 0xdc, 0xfc, 0x42, 0x8f, 0x6c, 0x4a, 0x9f, + 0x73, 0xe9, 0xac, 0x23, 0x2e, 0x34, 0x2b, 0x05, 0xcd, 0x8f, 0x30, 0xcb, 0x23, 0x9b, 0xe5, 0x72, + 0x3e, 0xef, 0xe0, 0xf9, 0xe4, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x88, 0x3c, 0xbb, 0x94, 0xd4, + 0x08, 0x00, 0x00, } diff --git a/internal/proto/segcore.proto b/internal/proto/segcore.proto index fe91fe9a90..61fb911f32 100644 --- a/internal/proto/segcore.proto +++ b/internal/proto/segcore.proto @@ -11,17 +11,18 @@ message RetrieveRequest { message RetrieveResults { schema.IDs ids = 1; - repeated schema.FieldData fields_data = 2; + repeated int64 offset = 2; + repeated schema.FieldData fields_data = 3; } message LoadFieldMeta { - int64 min_timestamp = 1; - int64 max_timestamp = 2; - int64 row_count = 3; + int64 min_timestamp = 1; + int64 max_timestamp = 2; + int64 row_count = 3; } message LoadSegmentMeta { - // TODOs - repeated LoadFieldMeta metas = 1; - int64 total_size = 2; + // TODOs + repeated LoadFieldMeta metas = 1; + int64 total_size = 2; } diff --git a/internal/proto/segcorepb/segcore.pb.go b/internal/proto/segcorepb/segcore.pb.go index b7035f6f37..5ccbb14f81 100644 --- a/internal/proto/segcorepb/segcore.pb.go +++ b/internal/proto/segcorepb/segcore.pb.go @@ -70,7 +70,8 @@ func (m *RetrieveRequest) GetOutputFieldsId() []int64 { type RetrieveResults struct { Ids *schemapb.IDs `protobuf:"bytes,1,opt,name=ids,proto3" json:"ids,omitempty"` - FieldsData []*schemapb.FieldData `protobuf:"bytes,2,rep,name=fields_data,json=fieldsData,proto3" json:"fields_data,omitempty"` + Offset []int64 `protobuf:"varint,2,rep,packed,name=offset,proto3" json:"offset,omitempty"` + FieldsData []*schemapb.FieldData `protobuf:"bytes,3,rep,name=fields_data,json=fieldsData,proto3" json:"fields_data,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -108,6 +109,13 @@ func (m *RetrieveResults) GetIds() *schemapb.IDs { return nil } +func (m *RetrieveResults) GetOffset() []int64 { + if m != nil { + return m.Offset + } + return nil +} + func (m *RetrieveResults) GetFieldsData() []*schemapb.FieldData { if m != nil { return m.FieldsData @@ -228,27 +236,28 @@ func init() { func init() { proto.RegisterFile("segcore.proto", fileDescriptor_1d79fce784797357) } var fileDescriptor_1d79fce784797357 = []byte{ - // 338 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xc1, 0x4b, 0xc3, 0x30, - 0x14, 0xc6, 0xd9, 0x8a, 0xe2, 0xb2, 0xcd, 0x49, 0xf1, 0x50, 0x14, 0x65, 0x74, 0x97, 0x21, 0xd8, - 0xc2, 0x14, 0xc1, 0x93, 0xa0, 0x43, 0x18, 0xe8, 0x25, 0xf3, 0xe4, 0xa5, 0x64, 0xed, 0xb3, 0x0b, - 0x36, 0x4d, 0x6d, 0x5e, 0xba, 0xb1, 0x83, 0x7f, 0xbb, 0x34, 0xa9, 0xb8, 0xc1, 0x2e, 0xde, 0xf2, - 0xbe, 0xf7, 0x7d, 0xef, 0xf7, 0x1e, 0x21, 0x7d, 0x05, 0x69, 0x2c, 0x4b, 0x08, 0x8a, 0x52, 0xa2, - 0x74, 0x4f, 0x05, 0xcf, 0x2a, 0xad, 0x6c, 0x15, 0x34, 0xbd, 0xb3, 0x9e, 0x8a, 0x97, 0x20, 0x98, - 0x55, 0xfd, 0x94, 0x0c, 0x28, 0x60, 0xc9, 0xa1, 0x02, 0x0a, 0x5f, 0x1a, 0x14, 0xba, 0x57, 0xc4, - 0xe1, 0x89, 0xf2, 0x5a, 0xc3, 0xd6, 0xb8, 0x3b, 0xf1, 0x82, 0xdd, 0x21, 0x36, 0x3b, 0x9b, 0x2a, - 0x5a, 0x9b, 0xdc, 0x31, 0x39, 0x91, 0x1a, 0x0b, 0x8d, 0xd1, 0x07, 0x87, 0x2c, 0x51, 0x11, 0x4f, - 0xbc, 0xf6, 0xd0, 0x19, 0x3b, 0xf4, 0xd8, 0xea, 0xcf, 0x46, 0x9e, 0x25, 0xfe, 0xf7, 0x36, 0x48, - 0xe9, 0x0c, 0xd5, 0xbf, 0x40, 0x0f, 0xa4, 0xdb, 0x10, 0x12, 0x86, 0xcc, 0x30, 0xba, 0x93, 0xcb, - 0xbd, 0x19, 0x83, 0x9c, 0x32, 0x64, 0x94, 0xd8, 0x48, 0xfd, 0xf6, 0x2b, 0xd2, 0x7f, 0x91, 0x2c, - 0x31, 0xcd, 0x57, 0x40, 0xe6, 0x8e, 0x48, 0x5f, 0xf0, 0x3c, 0x42, 0x2e, 0x40, 0x21, 0x13, 0x85, - 0xd9, 0xc3, 0xa1, 0x3d, 0xc1, 0xf3, 0xb7, 0x5f, 0xcd, 0x98, 0xd8, 0x7a, 0xcb, 0xd4, 0x6e, 0x4c, - 0x6c, 0xfd, 0x67, 0x3a, 0x27, 0x9d, 0x52, 0xae, 0xa2, 0x58, 0xea, 0x1c, 0x3d, 0xc7, 0x18, 0x8e, - 0x4a, 0xb9, 0x7a, 0xaa, 0x6b, 0xff, 0x93, 0x0c, 0x6a, 0xee, 0x1c, 0x52, 0x01, 0x39, 0x1a, 0xf2, - 0x3d, 0x39, 0x10, 0x80, 0xac, 0xbe, 0xbc, 0xbe, 0x62, 0x14, 0xec, 0xfb, 0xa7, 0x60, 0x67, 0x5b, - 0x6a, 0x13, 0xee, 0x05, 0x21, 0x28, 0x91, 0x65, 0x91, 0xe2, 0x1b, 0x68, 0x96, 0xe9, 0x18, 0x65, - 0xce, 0x37, 0xf0, 0x78, 0xf7, 0x7e, 0x9b, 0x72, 0x5c, 0xea, 0x45, 0x10, 0x4b, 0x11, 0xda, 0xb1, - 0xd7, 0x5c, 0x36, 0xaf, 0x90, 0xe7, 0x08, 0x65, 0xce, 0xb2, 0xd0, 0x90, 0xc2, 0x86, 0x54, 0x2c, - 0x16, 0x87, 0x46, 0xb8, 0xf9, 0x09, 0x00, 0x00, 0xff, 0xff, 0xa7, 0x83, 0x2e, 0xf6, 0x41, 0x02, - 0x00, 0x00, + // 357 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0x41, 0x4b, 0xeb, 0x40, + 0x14, 0x85, 0x69, 0xc3, 0x2b, 0xaf, 0xb7, 0xed, 0xeb, 0x23, 0x3c, 0x1e, 0x41, 0x51, 0x4a, 0xba, + 0x09, 0x82, 0x09, 0x54, 0x11, 0x5c, 0x09, 0x5a, 0x84, 0x82, 0x6e, 0x52, 0x57, 0x6e, 0xc2, 0x34, + 0xb9, 0x4d, 0x07, 0x33, 0x99, 0x98, 0xb9, 0x69, 0x4b, 0x7f, 0x88, 0xbf, 0x57, 0x32, 0x19, 0xb1, + 0x85, 0x6e, 0xdc, 0xcd, 0x3d, 0x73, 0xee, 0xf9, 0x4e, 0x32, 0x30, 0x50, 0x98, 0xc6, 0xb2, 0x44, + 0xbf, 0x28, 0x25, 0x49, 0xfb, 0x9f, 0xe0, 0xd9, 0xba, 0x52, 0xcd, 0xe4, 0x9b, 0xbb, 0x93, 0xbe, + 0x8a, 0x57, 0x28, 0x58, 0xa3, 0xba, 0x29, 0x0c, 0x43, 0xa4, 0x92, 0xe3, 0x1a, 0x43, 0x7c, 0xaf, + 0x50, 0x91, 0x7d, 0x01, 0x16, 0x4f, 0x94, 0xd3, 0x1a, 0xb5, 0xbc, 0xde, 0xc4, 0xf1, 0x0f, 0x43, + 0x9a, 0xdd, 0xd9, 0x54, 0x85, 0xb5, 0xc9, 0xf6, 0xe0, 0xaf, 0xac, 0xa8, 0xa8, 0x28, 0x5a, 0x72, + 0xcc, 0x12, 0x15, 0xf1, 0xc4, 0x69, 0x8f, 0x2c, 0xcf, 0x0a, 0xff, 0x34, 0xfa, 0xa3, 0x96, 0x67, + 0x89, 0xfb, 0xd1, 0xda, 0x27, 0xa9, 0x2a, 0x23, 0xf5, 0x23, 0xd2, 0x7f, 0xe8, 0xc8, 0xe5, 0x52, + 0x21, 0x99, 0x7c, 0x33, 0xd9, 0x77, 0xd0, 0x33, 0xe8, 0x84, 0x11, 0x73, 0xac, 0x91, 0xe5, 0xf5, + 0x26, 0xe7, 0x47, 0xb3, 0x74, 0x97, 0x29, 0x23, 0x16, 0x42, 0xb3, 0x52, 0x9f, 0xdd, 0x35, 0x0c, + 0x9e, 0x24, 0x4b, 0xf4, 0xe5, 0x33, 0x12, 0xb3, 0xc7, 0x30, 0x10, 0x3c, 0x8f, 0x88, 0x0b, 0x54, + 0xc4, 0x44, 0xa1, 0xfb, 0x59, 0x61, 0x5f, 0xf0, 0xfc, 0xe5, 0x4b, 0xd3, 0x26, 0xb6, 0xdd, 0x33, + 0xb5, 0x8d, 0x89, 0x6d, 0xbf, 0x4d, 0xa7, 0xd0, 0x2d, 0xe5, 0x26, 0x8a, 0x65, 0x95, 0x93, 0x63, + 0x69, 0xc3, 0xef, 0x52, 0x6e, 0x1e, 0xea, 0xd9, 0x7d, 0x83, 0x61, 0xcd, 0x9d, 0x63, 0x2a, 0x30, + 0x27, 0x4d, 0xbe, 0x85, 0x5f, 0x02, 0x89, 0xd5, 0x7f, 0xa4, 0xfe, 0x8a, 0xb1, 0x7f, 0xec, 0x01, + 0xfd, 0x83, 0xb6, 0x61, 0xb3, 0x61, 0x9f, 0x01, 0x90, 0x24, 0x96, 0x45, 0x8a, 0xef, 0xd0, 0x94, + 0xe9, 0x6a, 0x65, 0xce, 0x77, 0x78, 0x7f, 0xf3, 0x7a, 0x9d, 0x72, 0x5a, 0x55, 0x0b, 0x3f, 0x96, + 0x22, 0x68, 0x62, 0x2f, 0xb9, 0x34, 0xa7, 0x80, 0xe7, 0x84, 0x65, 0xce, 0xb2, 0x40, 0x93, 0x02, + 0x43, 0x2a, 0x16, 0x8b, 0x8e, 0x16, 0xae, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x12, 0x58, 0xd4, + 0xce, 0x5a, 0x02, 0x00, 0x00, } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index d4c71fa05d..ff7145f92e 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -652,6 +652,7 @@ func (it *InsertTask) checkFieldAutoID() error { if autoIDLoc >= 0 { fieldData := schemapb.FieldData{ FieldName: primaryFieldName, + FieldId: -1, Type: schemapb.DataType_Int64, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ @@ -1693,6 +1694,7 @@ func reduceSearchResultDataParallel(searchResultData []*schemapb.SearchResultDat if ret.Results.FieldsData[k] == nil || ret.Results.FieldsData[k].GetScalars() == nil { ret.Results.FieldsData[k] = &schemapb.FieldData{ FieldName: fieldData.FieldName, + FieldId: fieldData.FieldId, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{}, }, @@ -1768,6 +1770,7 @@ func reduceSearchResultDataParallel(searchResultData []*schemapb.SearchResultDat if ret.Results.FieldsData[k] == nil || ret.Results.FieldsData[k].GetVectors() == nil { ret.Results.FieldsData[k] = &schemapb.FieldData{ FieldName: fieldData.FieldName, + FieldId: fieldData.FieldId, Field: &schemapb.FieldData_Vectors{ Vectors: &schemapb.VectorField{ Dim: dim, @@ -1934,7 +1937,8 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { for k, fieldName := range st.query.OutputFields { for _, field := range schema.Fields { if st.result.Results.FieldsData[k] != nil && field.Name == fieldName { - st.result.Results.FieldsData[k].FieldName = fieldName + st.result.Results.FieldsData[k].FieldName = field.Name + st.result.Results.FieldsData[k].FieldId = field.FieldID st.result.Results.FieldsData[k].Type = field.DataType } } @@ -2110,10 +2114,6 @@ func (rt *RetrieveTask) PreExecute(ctx context.Context) error { addPrimaryKey := false for _, field := range schema.Fields { if reqField == field.Name { - if field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector { - errMsg := "Query does not support vector field currently" - return errors.New(errMsg) - } if field.IsPrimaryKey { addPrimaryKey = true } @@ -2383,6 +2383,7 @@ func (rt *RetrieveTask) PostExecute(ctx context.Context) error { for _, field := range schema.Fields { if field.FieldID == rt.OutputFieldsId[i] { rt.result.FieldsData[i].FieldName = field.Name + rt.result.FieldsData[i].FieldId = field.FieldID rt.result.FieldsData[i].Type = field.DataType } } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 03da3f13ac..e20a3f9085 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -218,11 +218,6 @@ func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID U vecFields = append(vecFields, field.FieldID) } } - - if len(vecFields) <= 0 { - return nil, errors.New("no vector field in collection %d" + strconv.FormatInt(collectionID, 10)) - } - return vecFields, nil } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index e4f29a5e3b..0832410376 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -391,6 +392,38 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { sp.Finish() } +func (q *queryCollection) getVectorOutputFieldIDs(msg queryMsg) ([]int64, error) { + var collID UniqueID + var outputFieldsID []int64 + var resultFieldIDs []int64 + + msgType := msg.Type() + switch msgType { + case commonpb.MsgType_Retrieve: + retrieveMsg := msg.(*msgstream.RetrieveMsg) + collID = retrieveMsg.CollectionID + outputFieldsID = retrieveMsg.OutputFieldsId + case commonpb.MsgType_Search: + searchMsg := msg.(*msgstream.SearchMsg) + collID = searchMsg.CollectionID + outputFieldsID = searchMsg.OutputFieldsId + default: + return resultFieldIDs, fmt.Errorf("receive invalid msgType = %d", msgType) + } + + vecFields, err := q.historical.replica.getVecFieldIDsByCollectionID(collID) + if err != nil { + return resultFieldIDs, err + } + + for _, fieldID := range vecFields { + if funcutil.SliceContain(outputFieldsID, fieldID) { + resultFieldIDs = append(resultFieldIDs, fieldID) + } + } + return resultFieldIDs, nil +} + func (q *queryCollection) doUnsolvedQueryMsg() { log.Debug("starting doUnsolvedMsg...", zap.Any("collectionID", q.collectionID)) for { @@ -1057,6 +1090,38 @@ func (q *queryCollection) search(msg queryMsg) error { return nil } +func (q *queryCollection) fillVectorOutputFieldsIfNeeded(msg queryMsg, segment *Segment, result *segcorepb.RetrieveResults) error { + // result is not empty + if len(result.Offset) <= 0 { + return nil + } + + // get all vector output field ids + vecOutputFieldIDs, err := q.getVectorOutputFieldIDs(msg) + if err != nil { + return err + } + + // output_fields contain vector field + for _, vecOutputFieldID := range vecOutputFieldIDs { + log.Debug("CYD - ", zap.Int64("vecOutputFieldID", vecOutputFieldID)) + vecFieldInfo, err := segment.getVectorFieldInfo(vecOutputFieldID) + if err != nil { + return fmt.Errorf("cannot get vector field info, fileID %d", vecOutputFieldID) + } + // vector field raw data is not loaded into memory + if !vecFieldInfo.getRawDataInMemory() { + if err = q.historical.loader.loadSegmentVectorFieldsData(vecFieldInfo); err != nil { + return err + } + if err = segment.fillRetrieveResults(result, vecOutputFieldID, vecFieldInfo); err != nil { + return err + } + } + } + return nil +} + func (q *queryCollection) retrieve(msg queryMsg) error { // TODO(yukun) // step 1: get retrieve object and defer destruction @@ -1134,6 +1199,10 @@ func (q *queryCollection) retrieve(msg queryMsg) error { if err != nil { return err } + + if err = q.fillVectorOutputFieldsIfNeeded(msg, segment, result); err != nil { + return err + } mergeList = append(mergeList, result) sealedSegmentRetrieved = append(sealedSegmentRetrieved, segmentID) } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 296ae4529e..e74bc836c3 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -34,7 +34,10 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/storage" ) type segmentType int32 @@ -46,6 +49,49 @@ const ( segmentTypeIndexing ) +type VectorFieldInfo struct { + mu sync.RWMutex + fieldBinlog *datapb.FieldBinlog + rawDataInMemory bool + rawData map[string]storage.FieldData // map[binlogPath]FieldData +} + +func newVectorFieldInfo(fieldBinlog *datapb.FieldBinlog) *VectorFieldInfo { + return &VectorFieldInfo{ + fieldBinlog: fieldBinlog, + rawDataInMemory: false, + rawData: make(map[string]storage.FieldData), + } +} + +func (v *VectorFieldInfo) setRawData(binlogPath string, data storage.FieldData) { + v.mu.Lock() + defer v.mu.Unlock() + v.rawData[binlogPath] = data +} + +func (v *VectorFieldInfo) getRawData(binlogPath string) storage.FieldData { + v.mu.Lock() + defer v.mu.Unlock() + if data, ok := v.rawData[binlogPath]; ok { + return data + } + return nil +} + +func (v *VectorFieldInfo) setRawDataInMemory(flag bool) { + v.mu.Lock() + defer v.mu.Unlock() + v.rawDataInMemory = flag +} + +func (v *VectorFieldInfo) getRawDataInMemory() bool { + v.mu.Lock() + defer v.mu.Unlock() + return v.rawDataInMemory +} + +//-------------------------------------------------------------------------------------- type Segment struct { segmentPtr C.CSegmentInterface @@ -70,6 +116,9 @@ type Segment struct { paramMutex sync.RWMutex // guards index indexInfos map[int64]*indexInfo + + vectorFieldMutex sync.RWMutex // guards vectorFieldInfos + vectorFieldInfos map[UniqueID]*VectorFieldInfo } //-------------------------------------------------------------------------------------- common interfaces @@ -121,12 +170,26 @@ func (s *Segment) setOnService(onService bool) { s.onService = onService } +func (s *Segment) setVectorFieldInfo(fieldID UniqueID, info *VectorFieldInfo) { + s.vectorFieldMutex.Lock() + defer s.vectorFieldMutex.Unlock() + s.vectorFieldInfos[fieldID] = info +} + +func (s *Segment) getVectorFieldInfo(fieldID UniqueID) (*VectorFieldInfo, error) { + s.vectorFieldMutex.Lock() + defer s.vectorFieldMutex.Unlock() + if info, ok := s.vectorFieldInfos[fieldID]; ok { + return info, nil + } + return nil, errors.New("Invalid fieldID " + strconv.Itoa(int(fieldID))) +} + func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) *Segment { /* CSegmentInterface NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); */ - indexInfos := make(map[int64]*indexInfo) var segmentPtr C.CSegmentInterface switch segType { case segmentTypeInvalid: @@ -143,18 +206,19 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c log.Debug("create segment", zap.Int64("segmentID", segmentID)) - var newSegment = &Segment{ - segmentPtr: segmentPtr, - segmentType: segType, - segmentID: segmentID, - partitionID: partitionID, - collectionID: collectionID, - vChannelID: vChannelID, - onService: onService, - indexInfos: indexInfos, + var segment = &Segment{ + segmentPtr: segmentPtr, + segmentType: segType, + segmentID: segmentID, + partitionID: partitionID, + collectionID: collectionID, + vChannelID: vChannelID, + onService: onService, + indexInfos: make(map[int64]*indexInfo), + vectorFieldInfos: make(map[UniqueID]*VectorFieldInfo), } - return newSegment + return segment } func deleteSegment(segment *Segment) { @@ -258,6 +322,50 @@ func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults return result, nil } +func (s *Segment) fillRetrieveResults(result *segcorepb.RetrieveResults, fieldID int64, fieldInfo *VectorFieldInfo) error { + for _, resultFieldData := range result.FieldsData { + if resultFieldData.FieldId != fieldID { + continue + } + + for i, offset := range result.Offset { + var success bool + for _, path := range fieldInfo.fieldBinlog.Binlogs { + rawData := fieldInfo.getRawData(path) + + var numRows, dim int64 + switch fieldData := rawData.(type) { + case *storage.FloatVectorFieldData: + numRows = int64(fieldData.NumRows) + dim = int64(fieldData.Dim) + if offset < numRows { + copy(resultFieldData.GetVectors().GetFloatVector().Data[int64(i)*dim:int64(i+1)*dim], fieldData.Data[offset*dim:(offset+1)*dim]) + success = true + } else { + offset -= numRows + } + case *storage.BinaryVectorFieldData: + numRows = int64(fieldData.NumRows) + dim = int64(fieldData.Dim) + if offset < numRows { + x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector) + copy(x.BinaryVector[int64(i)*dim/8:int64(i+1)*dim/8], fieldData.Data[offset*dim/8:(offset+1)*dim/8]) + success = true + } else { + offset -= numRows + } + default: + return errors.New("unexpected field data type") + } + if success { + break + } + } + } + } + return nil +} + func (s *Segment) fillTargetEntry(plan *SearchPlan, result *SearchResult) error { if s.segmentPtr == nil { return errors.New("null seg core pointer") diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 268025ff6e..6ea1ce8c2c 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -24,9 +24,10 @@ import ( minioKV "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" - queryPb "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/funcutil" ) const ( @@ -46,23 +47,23 @@ type segmentLoader struct { indexLoader *indexLoader } -func (loader *segmentLoader) loadSegmentOfConditionHandOff(req *queryPb.LoadSegmentsRequest) error { +func (loader *segmentLoader) loadSegmentOfConditionHandOff(req *querypb.LoadSegmentsRequest) error { return errors.New("TODO: implement hand off") } -func (loader *segmentLoader) loadSegmentOfConditionLoadBalance(req *queryPb.LoadSegmentsRequest) error { +func (loader *segmentLoader) loadSegmentOfConditionLoadBalance(req *querypb.LoadSegmentsRequest) error { return loader.loadSegment(req, false) } -func (loader *segmentLoader) loadSegmentOfConditionGRPC(req *queryPb.LoadSegmentsRequest) error { +func (loader *segmentLoader) loadSegmentOfConditionGRPC(req *querypb.LoadSegmentsRequest) error { return loader.loadSegment(req, true) } -func (loader *segmentLoader) loadSegmentOfConditionNodeDown(req *queryPb.LoadSegmentsRequest) error { +func (loader *segmentLoader) loadSegmentOfConditionNodeDown(req *querypb.LoadSegmentsRequest) error { return loader.loadSegment(req, true) } -func (loader *segmentLoader) loadSegment(req *queryPb.LoadSegmentsRequest, onService bool) error { +func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onService bool) error { // no segment needs to load, return if len(req.Infos) == 0 { return nil @@ -96,7 +97,7 @@ func (loader *segmentLoader) loadSegment(req *queryPb.LoadSegmentsRequest, onSer continue } segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, onService) - err = loader.loadSegmentInternal(collectionID, segment, info.BinlogPaths) + err = loader.loadSegmentInternal(collectionID, segment, info) if err != nil { deleteSegment(segment) log.Error(err.Error()) @@ -116,14 +117,14 @@ func (loader *segmentLoader) loadSegment(req *queryPb.LoadSegmentsRequest, onSer log.Error("error when load segment info from etcd", zap.Any("error", err.Error())) continue } - segmentInfo := &queryPb.SegmentInfo{} + segmentInfo := &querypb.SegmentInfo{} err = proto.UnmarshalText(value, segmentInfo) if err != nil { deleteSegment(segment) log.Error("error when unmarshal segment info from etcd", zap.Any("error", err.Error())) continue } - segmentInfo.SegmentState = queryPb.SegmentState_sealed + segmentInfo.SegmentState = querypb.SegmentState_sealed newKey := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID) err = loader.etcdKV.Save(newKey, proto.MarshalTextString(segmentInfo)) if err != nil { @@ -137,33 +138,42 @@ func (loader *segmentLoader) loadSegment(req *queryPb.LoadSegmentsRequest, onSer return loader.indexLoader.sendQueryNodeStats() } -func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, - segment *Segment, - binlogPaths []*datapb.FieldBinlog) error { - +func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment *Segment, segmentLoadInfo *querypb.SegmentLoadInfo) error { vectorFieldIDs, err := loader.historicalReplica.getVecFieldIDsByCollectionID(collectionID) if err != nil { return err } + if len(vectorFieldIDs) <= 0 { + return fmt.Errorf("no vector field in collection %d", collectionID) + } - loadIndexFieldIDs := make([]int64, 0) + // add VectorFieldInfo for vector fields + for _, fieldBinlog := range segmentLoadInfo.BinlogPaths { + if funcutil.SliceContain(vectorFieldIDs, fieldBinlog.FieldID) { + vectorFieldInfo := newVectorFieldInfo(fieldBinlog) + segment.setVectorFieldInfo(fieldBinlog.FieldID, vectorFieldInfo) + } + } + + indexedFieldIDs := make([]int64, 0) for _, vecFieldID := range vectorFieldIDs { err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID) if err != nil { log.Warn(err.Error()) continue } - loadIndexFieldIDs = append(loadIndexFieldIDs, vecFieldID) + indexedFieldIDs = append(indexedFieldIDs, vecFieldID) } - // we don't need load to vector fields - binlogPaths = loader.filterOutVectorFields(binlogPaths, loadIndexFieldIDs) + + // we don't need to load raw data for indexed vector field + fieldBinlogs := loader.filterFieldBinlogs(segmentLoadInfo.BinlogPaths, indexedFieldIDs) log.Debug("loading insert...") - err = loader.loadSegmentFieldsData(segment, binlogPaths) + err = loader.loadSegmentFieldsData(segment, fieldBinlogs) if err != nil { return err } - for _, id := range loadIndexFieldIDs { + for _, id := range indexedFieldIDs { log.Debug("loading index...") err = loader.indexLoader.loadIndex(segment, id) if err != nil { @@ -194,27 +204,17 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, // return statesResponse, nil //} -func (loader *segmentLoader) filterOutVectorFields(binlogPaths []*datapb.FieldBinlog, - vectorFields []int64) []*datapb.FieldBinlog { - - containsFunc := func(s []int64, e int64) bool { - for _, a := range s { - if a == e { - return true - } - } - return false - } - targetFields := make([]*datapb.FieldBinlog, 0) - for _, path := range binlogPaths { - if !containsFunc(vectorFields, path.FieldID) { - targetFields = append(targetFields, path) +func (loader *segmentLoader) filterFieldBinlogs(fieldBinlogs []*datapb.FieldBinlog, skipFieldIDs []int64) []*datapb.FieldBinlog { + result := make([]*datapb.FieldBinlog, 0) + for _, fieldBinlog := range fieldBinlogs { + if !funcutil.SliceContain(skipFieldIDs, fieldBinlog.FieldID) { + result = append(result, fieldBinlog) } } - return targetFields + return result } -func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths []*datapb.FieldBinlog) error { +func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlogs []*datapb.FieldBinlog) error { iCodec := storage.InsertCodec{} defer func() { err := iCodec.Close() @@ -223,16 +223,13 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths } }() blobs := make([]*storage.Blob, 0) - for _, binlogPath := range binlogPaths { - fieldID := binlogPath.FieldID - - paths := binlogPath.Binlogs + for _, fb := range fieldBinlogs { log.Debug("load segment fields data", zap.Int64("segmentID", segment.segmentID), - zap.Any("fieldID", fieldID), - zap.String("paths", fmt.Sprintln(paths)), + zap.Any("fieldID", fb.FieldID), + zap.String("paths", fmt.Sprintln(fb.Binlogs)), ) - for _, path := range paths { + for _, path := range fb.Binlogs { p := path binLog, err := loader.minioKV.Load(path) if err != nil { @@ -245,14 +242,18 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths } blobs = append(blobs, blob) } + // mark the flag that vector raw data will be loaded into memory + if vecFieldInfo, err := segment.getVectorFieldInfo(fb.FieldID); err == nil { + vecFieldInfo.setRawDataInMemory(true) + } } _, _, insertData, err := iCodec.Deserialize(blobs) - if err != nil { log.Error(err.Error()) return err } + for fieldID, value := range insertData.Data { var numRows int var data interface{} @@ -300,6 +301,42 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths return nil } +func (loader *segmentLoader) loadSegmentVectorFieldsData(info *VectorFieldInfo) error { + iCodec := storage.InsertCodec{} + defer func() { + err := iCodec.Close() + if err != nil { + log.Error(err.Error()) + } + }() + for _, path := range info.fieldBinlog.Binlogs { + if data := info.getRawData(path); data != nil { + continue + } + + binLog, err := loader.minioKV.Load(path) + if err != nil { + return err + } + + blob := &storage.Blob{ + Key: path, + Value: []byte(binLog), + } + + insertFieldData, err := iCodec.DeserializeOneVectorBinlog(blob) + if err != nil { + log.Error(err.Error()) + return err + } + + // save raw data into segment.vectorFieldInfo + info.setRawData(path, insertFieldData.Data) + } + + return nil +} + func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index bad980cc2a..86acd5f822 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -36,6 +36,8 @@ type ( Timestamp = typeutil.Timestamp ) +const InvalidUniqueID = UniqueID(-1) + type Blob struct { Key string Value []byte @@ -132,6 +134,12 @@ type InsertData struct { Infos []BlobInfo } +type InsertFieldData struct { + ID FieldID + Data FieldData + Infos []BlobInfo +} + // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { @@ -242,7 +250,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) { if len(blobs) == 0 { - return -1, -1, nil, fmt.Errorf("blobs is empty") + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") } readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } @@ -258,7 +266,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID for _, blob := range blobList { binlogReader, err := NewBinlogReader(blob.Value) if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } // read partitionID and SegmentID @@ -270,7 +278,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID for { eventReader, err := binlogReader.NextEventReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } if eventReader == nil { break @@ -283,12 +291,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID boolFieldData := resultData.Data[fieldID].(*BoolFieldData) singleData, err := eventReader.GetBoolFromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } boolFieldData.Data = append(boolFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length boolFieldData.NumRows += length @@ -300,12 +308,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID int8FieldData := resultData.Data[fieldID].(*Int8FieldData) singleData, err := eventReader.GetInt8FromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } int8FieldData.Data = append(int8FieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length int8FieldData.NumRows += length @@ -317,12 +325,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID int16FieldData := resultData.Data[fieldID].(*Int16FieldData) singleData, err := eventReader.GetInt16FromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } int16FieldData.Data = append(int16FieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length int16FieldData.NumRows += length @@ -334,12 +342,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID int32FieldData := resultData.Data[fieldID].(*Int32FieldData) singleData, err := eventReader.GetInt32FromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } int32FieldData.Data = append(int32FieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length int32FieldData.NumRows += length @@ -351,12 +359,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID int64FieldData := resultData.Data[fieldID].(*Int64FieldData) singleData, err := eventReader.GetInt64FromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } int64FieldData.Data = append(int64FieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length int64FieldData.NumRows += length @@ -368,12 +376,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID floatFieldData := resultData.Data[fieldID].(*FloatFieldData) singleData, err := eventReader.GetFloatFromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } floatFieldData.Data = append(floatFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length floatFieldData.NumRows += length @@ -385,12 +393,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID doubleFieldData := resultData.Data[fieldID].(*DoubleFieldData) singleData, err := eventReader.GetDoubleFromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } doubleFieldData.Data = append(doubleFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length doubleFieldData.NumRows += length @@ -402,14 +410,14 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID stringFieldData := resultData.Data[fieldID].(*StringFieldData) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length stringFieldData.NumRows += length for i := 0; i < length; i++ { singleString, err := eventReader.GetOneStringFromPayload(i) if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } stringFieldData.Data = append(stringFieldData.Data, singleString) } @@ -422,12 +430,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID var singleData []byte singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length binaryVectorFieldData.NumRows += length @@ -440,18 +448,18 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID var singleData []float32 singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { - return -1, -1, nil, err + return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length floatVectorFieldData.NumRows += length resultData.Data[fieldID] = floatVectorFieldData default: - return -1, -1, nil, fmt.Errorf("undefined data type %d", dataType) + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType) } } if fieldID == rootcoord.TimeStampField { @@ -466,6 +474,75 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return pID, sID, resultData, nil } +func (insertCodec *InsertCodec) DeserializeOneVectorBinlog(blob *Blob) (data *InsertFieldData, err error) { + resultData := &InsertFieldData{ + ID: InvalidUniqueID, + } + binlogReader, err := NewBinlogReader(blob.Value) + if err != nil { + return nil, err + } + + dataType := binlogReader.PayloadDataType + fieldID := binlogReader.FieldID + totalLength := 0 + for { + eventReader, err := binlogReader.NextEventReader() + if err != nil { + return nil, err + } + if eventReader == nil { + break + } + switch dataType { + case schemapb.DataType_BinaryVector: + if resultData.ID == InvalidUniqueID { + resultData.ID = fieldID + resultData.Data = &BinaryVectorFieldData{} + } + binaryVectorFieldData := resultData.Data.(*BinaryVectorFieldData) + var singleData []byte + singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload() + if err != nil { + return nil, err + } + binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return nil, err + } + totalLength += length + binaryVectorFieldData.NumRows += length + resultData.Data = binaryVectorFieldData + case schemapb.DataType_FloatVector: + if resultData.ID == InvalidUniqueID { + resultData.ID = fieldID + resultData.Data = &FloatVectorFieldData{} + } + floatVectorFieldData := resultData.Data.(*FloatVectorFieldData) + var singleData []float32 + singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload() + if err != nil { + return nil, err + } + floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return nil, err + } + totalLength += length + floatVectorFieldData.NumRows += length + resultData.Data = floatVectorFieldData + default: + return nil, fmt.Errorf("undefined data type %d", dataType) + } + } + if err = binlogReader.Close(); err != nil { + return nil, err + } + return resultData, nil +} + func (insertCodec *InsertCodec) Close() error { for _, closeFunc := range insertCodec.readerCloseFunc { err := closeFunc() @@ -697,7 +774,7 @@ func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]st break } if file == nil { - return nil, nil, "", -1, fmt.Errorf("can not find params blob") + return nil, nil, "", InvalidUniqueID, fmt.Errorf("can not find params blob") } info := struct { Params map[string]string @@ -705,7 +782,7 @@ func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]st IndexID UniqueID }{} if err := json.Unmarshal(file.Value, &info); err != nil { - return nil, nil, "", -1, fmt.Errorf("json unmarshal error: %s", err.Error()) + return nil, nil, "", InvalidUniqueID, fmt.Errorf("json unmarshal error: %s", err.Error()) } return blobs, info.Params, info.IndexName, info.IndexID, nil diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index cc4e802d86..fc2003dcec 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -22,11 +22,29 @@ import ( "go.uber.org/zap" ) +const ( + CollectionID = 1 + PartitionID = 1 + SegmentID = 1 + RowIDField = 0 + TimestampField = 1 + BoolField = 100 + Int8Field = 101 + Int16Field = 102 + Int32Field = 103 + Int64Field = 104 + FloatField = 105 + DoubleField = 106 + StringField = 107 + BinaryVectorField = 108 + FloatVectorField = 109 +) + func TestInsertCodec(t *testing.T) { schema := &etcdpb.CollectionMeta{ - ID: 1, + ID: CollectionID, CreateTime: 1, - SegmentIDs: []int64{0, 1}, + SegmentIDs: []int64{SegmentID}, PartitionTags: []string{"partition_0", "partition_1"}, Schema: &schemapb.CollectionSchema{ Name: "schema", @@ -34,244 +52,243 @@ func TestInsertCodec(t *testing.T) { AutoID: true, Fields: []*schemapb.FieldSchema{ { - FieldID: 0, + FieldID: RowIDField, Name: "row_id", IsPrimaryKey: false, Description: "row_id", DataType: schemapb.DataType_Int64, }, { - FieldID: 1, - Name: "Ts", + FieldID: TimestampField, + Name: "Timestamp", IsPrimaryKey: false, - Description: "Ts", + Description: "Timestamp", DataType: schemapb.DataType_Int64, }, { - FieldID: 100, + FieldID: BoolField, Name: "field_bool", IsPrimaryKey: false, - Description: "description_2", + Description: "bool", DataType: schemapb.DataType_Bool, }, { - FieldID: 101, + FieldID: Int8Field, Name: "field_int8", IsPrimaryKey: false, - Description: "description_3", + Description: "int8", DataType: schemapb.DataType_Int8, }, { - FieldID: 102, + FieldID: Int16Field, Name: "field_int16", IsPrimaryKey: false, - Description: "description_4", + Description: "int16", DataType: schemapb.DataType_Int16, }, { - FieldID: 103, + FieldID: Int32Field, Name: "field_int32", IsPrimaryKey: false, - Description: "description_5", + Description: "int32", DataType: schemapb.DataType_Int32, }, { - FieldID: 104, + FieldID: Int64Field, Name: "field_int64", IsPrimaryKey: false, - Description: "description_6", + Description: "int64", DataType: schemapb.DataType_Int64, }, { - FieldID: 105, + FieldID: FloatField, Name: "field_float", IsPrimaryKey: false, - Description: "description_7", + Description: "float", DataType: schemapb.DataType_Float, }, { - FieldID: 106, + FieldID: DoubleField, Name: "field_double", IsPrimaryKey: false, - Description: "description_8", + Description: "double", DataType: schemapb.DataType_Double, }, { - FieldID: 107, + FieldID: StringField, Name: "field_string", IsPrimaryKey: false, - Description: "description_9", + Description: "string", DataType: schemapb.DataType_String, }, { - FieldID: 108, + FieldID: BinaryVectorField, Name: "field_binary_vector", IsPrimaryKey: false, - Description: "description_10", + Description: "binary_vector", DataType: schemapb.DataType_BinaryVector, }, { - FieldID: 109, + FieldID: FloatVectorField, Name: "field_float_vector", IsPrimaryKey: false, - Description: "description_11", + Description: "float_vector", DataType: schemapb.DataType_FloatVector, }, }, }, } insertCodec := NewInsertCodec(schema) - insertDataFirst := &InsertData{ + insertData1 := &InsertData{ Data: map[int64]FieldData{ - 0: &Int64FieldData{ + RowIDField: &Int64FieldData{ NumRows: 2, Data: []int64{3, 4}, }, - 1: &Int64FieldData{ + TimestampField: &Int64FieldData{ NumRows: 2, Data: []int64{3, 4}, }, - 100: &BoolFieldData{ + BoolField: &BoolFieldData{ NumRows: 2, Data: []bool{true, false}, }, - 101: &Int8FieldData{ + Int8Field: &Int8FieldData{ NumRows: 2, Data: []int8{3, 4}, }, - 102: &Int16FieldData{ + Int16Field: &Int16FieldData{ NumRows: 2, Data: []int16{3, 4}, }, - 103: &Int32FieldData{ + Int32Field: &Int32FieldData{ NumRows: 2, Data: []int32{3, 4}, }, - 104: &Int64FieldData{ + Int64Field: &Int64FieldData{ NumRows: 2, Data: []int64{3, 4}, }, - 105: &FloatFieldData{ + FloatField: &FloatFieldData{ NumRows: 2, Data: []float32{3, 4}, }, - 106: &DoubleFieldData{ + DoubleField: &DoubleFieldData{ NumRows: 2, Data: []float64{3, 4}, }, - 107: &StringFieldData{ + StringField: &StringFieldData{ NumRows: 2, Data: []string{"3", "4"}, }, - 108: &BinaryVectorFieldData{ + BinaryVectorField: &BinaryVectorFieldData{ NumRows: 2, Data: []byte{0, 255}, Dim: 8, }, - 109: &FloatVectorFieldData{ + FloatVectorField: &FloatVectorFieldData{ NumRows: 2, - Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, - Dim: 8, + Data: []float32{4, 5, 6, 7, 4, 5, 6, 7}, + Dim: 4, }, }, } - insertDataSecond := &InsertData{ + insertData2 := &InsertData{ Data: map[int64]FieldData{ - 0: &Int64FieldData{ + RowIDField: &Int64FieldData{ NumRows: 2, Data: []int64{1, 2}, }, - 1: &Int64FieldData{ + TimestampField: &Int64FieldData{ NumRows: 2, Data: []int64{1, 2}, }, - 100: &BoolFieldData{ + BoolField: &BoolFieldData{ NumRows: 2, Data: []bool{true, false}, }, - 101: &Int8FieldData{ + Int8Field: &Int8FieldData{ NumRows: 2, Data: []int8{1, 2}, }, - 102: &Int16FieldData{ + Int16Field: &Int16FieldData{ NumRows: 2, Data: []int16{1, 2}, }, - 103: &Int32FieldData{ + Int32Field: &Int32FieldData{ NumRows: 2, Data: []int32{1, 2}, }, - 104: &Int64FieldData{ + Int64Field: &Int64FieldData{ NumRows: 2, Data: []int64{1, 2}, }, - 105: &FloatFieldData{ + FloatField: &FloatFieldData{ NumRows: 2, Data: []float32{1, 2}, }, - 106: &DoubleFieldData{ + DoubleField: &DoubleFieldData{ NumRows: 2, Data: []float64{1, 2}, }, - 107: &StringFieldData{ + StringField: &StringFieldData{ NumRows: 2, Data: []string{"1", "2"}, }, - 108: &BinaryVectorFieldData{ + BinaryVectorField: &BinaryVectorFieldData{ NumRows: 2, Data: []byte{0, 255}, Dim: 8, }, - 109: &FloatVectorFieldData{ + FloatVectorField: &FloatVectorFieldData{ NumRows: 2, - Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, - Dim: 8, + Data: []float32{0, 1, 2, 3, 0, 1, 2, 3}, + Dim: 4, }, }, } - firstBlobs, _, err := insertCodec.Serialize(1, 1, insertDataFirst) + Blobs1, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) assert.Nil(t, err) - for _, blob := range firstBlobs { + for _, blob := range Blobs1 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) assert.Equal(t, blob.GetKey(), blob.Key) } - secondBlobs, _, err := insertCodec.Serialize(1, 1, insertDataSecond) + Blobs2, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2) assert.Nil(t, err) - for _, blob := range secondBlobs { + for _, blob := range Blobs2 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) assert.Equal(t, blob.GetKey(), blob.Key) } - resultBlobs := append(firstBlobs, secondBlobs...) - partitionID, segmentID, resultData, err := insertCodec.Deserialize(resultBlobs) + resultBlobs := append(Blobs1, Blobs2...) + partID, segID, resultData, err := insertCodec.Deserialize(resultBlobs) assert.Nil(t, err) - assert.Equal(t, int64(1), partitionID) - assert.Equal(t, int64(1), segmentID) - assert.Equal(t, 4, resultData.Data[0].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[1].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[100].(*BoolFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[101].(*Int8FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[102].(*Int16FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[103].(*Int32FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[104].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[105].(*FloatFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[106].(*DoubleFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[107].(*StringFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[108].(*BinaryVectorFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[109].(*FloatVectorFieldData).NumRows) - assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[0].(*Int64FieldData).Data) - assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[1].(*Int64FieldData).Data) - assert.Equal(t, []bool{true, false, true, false}, resultData.Data[100].(*BoolFieldData).Data) - assert.Equal(t, []int8{1, 2, 3, 4}, resultData.Data[101].(*Int8FieldData).Data) - assert.Equal(t, []int16{1, 2, 3, 4}, resultData.Data[102].(*Int16FieldData).Data) - assert.Equal(t, []int32{1, 2, 3, 4}, resultData.Data[103].(*Int32FieldData).Data) - assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[104].(*Int64FieldData).Data) - assert.Equal(t, []float32{1, 2, 3, 4}, resultData.Data[105].(*FloatFieldData).Data) - assert.Equal(t, []float64{1, 2, 3, 4}, resultData.Data[106].(*DoubleFieldData).Data) - assert.Equal(t, []string{"1", "2", "3", "4"}, resultData.Data[107].(*StringFieldData).Data) - assert.Equal(t, []byte{0, 255, 0, 255}, resultData.Data[108].(*BinaryVectorFieldData).Data) - assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, - resultData.Data[109].(*FloatVectorFieldData).Data) + assert.Equal(t, UniqueID(PartitionID), partID) + assert.Equal(t, UniqueID(SegmentID), segID) + assert.Equal(t, 4, resultData.Data[RowIDField].(*Int64FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[TimestampField].(*Int64FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[BoolField].(*BoolFieldData).NumRows) + assert.Equal(t, 4, resultData.Data[Int8Field].(*Int8FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[Int16Field].(*Int16FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[Int32Field].(*Int32FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[Int64Field].(*Int64FieldData).NumRows) + assert.Equal(t, 4, resultData.Data[FloatField].(*FloatFieldData).NumRows) + assert.Equal(t, 4, resultData.Data[DoubleField].(*DoubleFieldData).NumRows) + assert.Equal(t, 4, resultData.Data[StringField].(*StringFieldData).NumRows) + assert.Equal(t, 4, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows) + assert.Equal(t, 4, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows) + assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[RowIDField].(*Int64FieldData).Data) + assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[TimestampField].(*Int64FieldData).Data) + assert.Equal(t, []bool{true, false, true, false}, resultData.Data[BoolField].(*BoolFieldData).Data) + assert.Equal(t, []int8{1, 2, 3, 4}, resultData.Data[Int8Field].(*Int8FieldData).Data) + assert.Equal(t, []int16{1, 2, 3, 4}, resultData.Data[Int16Field].(*Int16FieldData).Data) + assert.Equal(t, []int32{1, 2, 3, 4}, resultData.Data[Int32Field].(*Int32FieldData).Data) + assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[Int64Field].(*Int64FieldData).Data) + assert.Equal(t, []float32{1, 2, 3, 4}, resultData.Data[FloatField].(*FloatFieldData).Data) + assert.Equal(t, []float64{1, 2, 3, 4}, resultData.Data[DoubleField].(*DoubleFieldData).Data) + assert.Equal(t, []string{"1", "2", "3", "4"}, resultData.Data[StringField].(*StringFieldData).Data) + assert.Equal(t, []byte{0, 255, 0, 255}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).Data) + assert.Equal(t, []float32{0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 4, 5, 6, 7}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).Data) assert.Nil(t, insertCodec.Close()) log.Debug("Data", zap.Any("Data", resultData.Data)) log.Debug("Infos", zap.Any("Infos", resultData.Infos)) @@ -282,12 +299,7 @@ func TestInsertCodec(t *testing.T) { } func TestDDCodec(t *testing.T) { dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) - ts := []Timestamp{ - 1, - 2, - 3, - 4, - } + ts := []Timestamp{1, 2, 3, 4} ddRequests := []string{ "CreateCollection", "DropCollection", @@ -364,9 +376,9 @@ func TestTsError(t *testing.T) { func TestSchemaError(t *testing.T) { schema := &etcdpb.CollectionMeta{ - ID: 1, + ID: CollectionID, CreateTime: 1, - SegmentIDs: []int64{0, 1}, + SegmentIDs: []int64{SegmentID}, PartitionTags: []string{"partition_0", "partition_1"}, Schema: &schemapb.CollectionSchema{ Name: "schema", @@ -374,24 +386,24 @@ func TestSchemaError(t *testing.T) { AutoID: true, Fields: []*schemapb.FieldSchema{ { - FieldID: 0, + FieldID: RowIDField, Name: "row_id", IsPrimaryKey: false, Description: "row_id", DataType: schemapb.DataType_Int64, }, { - FieldID: 1, - Name: "Ts", + FieldID: TimestampField, + Name: "Timestamp", IsPrimaryKey: false, - Description: "Ts", + Description: "Timestamp", DataType: schemapb.DataType_Int64, }, { - FieldID: 100, + FieldID: BoolField, Name: "field_bool", IsPrimaryKey: false, - Description: "description_2", + Description: "bool", DataType: 999, }, }, @@ -399,22 +411,22 @@ func TestSchemaError(t *testing.T) { } insertData := &InsertData{ Data: map[int64]FieldData{ - 0: &Int64FieldData{ + RowIDField: &Int64FieldData{ NumRows: 2, Data: []int64{3, 4}, }, - 1: &Int64FieldData{ + TimestampField: &Int64FieldData{ NumRows: 2, Data: []int64{3, 4}, }, - 100: &BoolFieldData{ + BoolField: &BoolFieldData{ NumRows: 2, Data: []bool{true, false}, }, }, } insertCodec := NewInsertCodec(schema) - blobs, _, err := insertCodec.Serialize(1, 1, insertData) + blobs, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData) assert.Nil(t, blobs) assert.NotNil(t, err) } diff --git a/tests20/python_client/testcases/test_query.py b/tests20/python_client/testcases/test_query.py index 400c782ba6..b351462706 100644 --- a/tests20/python_client/testcases/test_query.py +++ b/tests20/python_client/testcases/test_query.py @@ -305,6 +305,7 @@ class TestQueryBase(TestcaseBase): assert set(res_1[0].keys()) == set(fields) @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="issue #6299") def test_query_output_vec_field(self): """ target: test query with vec output field