mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: add valid_data in span (#35030)
#31728 Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
f466129924
commit
475c333fa2
@ -33,6 +33,15 @@ class SpanBase {
|
|||||||
int64_t element_sizeof)
|
int64_t element_sizeof)
|
||||||
: data_(data), row_count_(row_count), element_sizeof_(element_sizeof) {
|
: data_(data), row_count_(row_count), element_sizeof_(element_sizeof) {
|
||||||
}
|
}
|
||||||
|
explicit SpanBase(const void* data,
|
||||||
|
const bool* valid_data,
|
||||||
|
int64_t row_count,
|
||||||
|
int64_t element_sizeof)
|
||||||
|
: data_(data),
|
||||||
|
valid_data_(valid_data),
|
||||||
|
row_count_(row_count),
|
||||||
|
element_sizeof_(element_sizeof) {
|
||||||
|
}
|
||||||
|
|
||||||
int64_t
|
int64_t
|
||||||
row_count() const {
|
row_count() const {
|
||||||
@ -49,8 +58,14 @@ class SpanBase {
|
|||||||
return data_;
|
return data_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bool*
|
||||||
|
valid_data() const {
|
||||||
|
return valid_data_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const void* data_;
|
const void* data_;
|
||||||
|
const bool* valid_data_{nullptr};
|
||||||
int64_t row_count_;
|
int64_t row_count_;
|
||||||
int64_t element_sizeof_;
|
int64_t element_sizeof_;
|
||||||
};
|
};
|
||||||
@ -65,20 +80,22 @@ class Span<T,
|
|||||||
std::is_same_v<T, PkType>>> {
|
std::is_same_v<T, PkType>>> {
|
||||||
public:
|
public:
|
||||||
using embedded_type = T;
|
using embedded_type = T;
|
||||||
explicit Span(const T* data, int64_t row_count)
|
explicit Span(const T* data, const bool* valid_data, int64_t row_count)
|
||||||
: data_(data), row_count_(row_count) {
|
: data_(data), valid_data_(valid_data), row_count_(row_count) {
|
||||||
}
|
}
|
||||||
|
|
||||||
explicit Span(std::string_view data) {
|
explicit Span(std::string_view data, bool* valid_data) {
|
||||||
Span(data.data(), data.size());
|
Span(data.data(), valid_data, data.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
operator SpanBase() const {
|
operator SpanBase() const {
|
||||||
return SpanBase(data_, row_count_, sizeof(T));
|
return SpanBase(data_, valid_data_, row_count_, sizeof(T));
|
||||||
}
|
}
|
||||||
|
|
||||||
explicit Span(const SpanBase& base)
|
explicit Span(const SpanBase& base)
|
||||||
: Span(reinterpret_cast<const T*>(base.data()), base.row_count()) {
|
: Span(reinterpret_cast<const T*>(base.data()),
|
||||||
|
base.valid_data(),
|
||||||
|
base.row_count()) {
|
||||||
assert(base.element_sizeof() == sizeof(T));
|
assert(base.element_sizeof() == sizeof(T));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,6 +109,11 @@ class Span<T,
|
|||||||
return data_;
|
return data_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bool*
|
||||||
|
valid_data() const {
|
||||||
|
return valid_data_;
|
||||||
|
}
|
||||||
|
|
||||||
const T&
|
const T&
|
||||||
operator[](int64_t offset) const {
|
operator[](int64_t offset) const {
|
||||||
return data_[offset];
|
return data_[offset];
|
||||||
@ -104,6 +126,7 @@ class Span<T,
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const T* data_;
|
const T* data_;
|
||||||
|
const bool* valid_data_;
|
||||||
const int64_t row_count_;
|
const int64_t row_count_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -77,7 +77,8 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
|
|||||||
return [chunk_data](int i) -> const number { return chunk_data[i]; };
|
return [chunk_data](int i) -> const number { return chunk_data[i]; };
|
||||||
} else {
|
} else {
|
||||||
auto chunk_data =
|
auto chunk_data =
|
||||||
segment_->chunk_view<std::string_view>(field_id, chunk_id).data();
|
segment_->chunk_view<std::string_view>(field_id, chunk_id)
|
||||||
|
.first.data();
|
||||||
return [chunk_data](int i) -> const number {
|
return [chunk_data](int i) -> const number {
|
||||||
return std::string(chunk_data[i]);
|
return std::string(chunk_data[i]);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -206,8 +206,11 @@ class SegmentExpr : public Expr {
|
|||||||
|
|
||||||
auto& skip_index = segment_->GetSkipIndex();
|
auto& skip_index = segment_->GetSkipIndex();
|
||||||
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
|
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
|
||||||
auto data_vec = segment_->get_batch_views<T>(
|
auto data_vec =
|
||||||
field_id_, 0, current_data_chunk_pos_, need_size);
|
segment_
|
||||||
|
->get_batch_views<T>(
|
||||||
|
field_id_, 0, current_data_chunk_pos_, need_size)
|
||||||
|
.first;
|
||||||
|
|
||||||
func(data_vec.data(), need_size, res, values...);
|
func(data_vec.data(), need_size, res, values...);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,6 +34,10 @@ class ChunkVectorBase {
|
|||||||
get_chunk_size(int64_t index) = 0;
|
get_chunk_size(int64_t index) = 0;
|
||||||
virtual Type
|
virtual Type
|
||||||
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
|
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
|
||||||
|
virtual int64_t
|
||||||
|
get_element_size() = 0;
|
||||||
|
virtual int64_t
|
||||||
|
get_element_offset(int64_t index) = 0;
|
||||||
virtual ChunkViewType<Type>
|
virtual ChunkViewType<Type>
|
||||||
view_element(int64_t chunk_id, int64_t chunk_offset) = 0;
|
view_element(int64_t chunk_id, int64_t chunk_offset) = 0;
|
||||||
int64_t
|
int64_t
|
||||||
@ -166,6 +170,25 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
|
|||||||
vec_.clear();
|
vec_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t
|
||||||
|
get_element_size() override {
|
||||||
|
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||||
|
if constexpr (IsMmap && std::is_same_v<std::string, Type>) {
|
||||||
|
return sizeof(ChunkViewType<Type>);
|
||||||
|
}
|
||||||
|
return sizeof(Type);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t
|
||||||
|
get_element_offset(int64_t index) override {
|
||||||
|
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||||
|
int64_t offset = 0;
|
||||||
|
for (int i = 0; i < index - 1; i++) {
|
||||||
|
offset += vec_[i].size();
|
||||||
|
}
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
SpanBase
|
SpanBase
|
||||||
get_span(int64_t chunk_id) override {
|
get_span(int64_t chunk_id) override {
|
||||||
std::shared_lock<std::shared_mutex> lck(mutex_);
|
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||||
|
|||||||
@ -72,6 +72,10 @@ class ColumnBase {
|
|||||||
SetPaddingSize(data_type);
|
SetPaddingSize(data_type);
|
||||||
|
|
||||||
if (IsVariableDataType(data_type)) {
|
if (IsVariableDataType(data_type)) {
|
||||||
|
if (field_meta.is_nullable()) {
|
||||||
|
nullable_ = true;
|
||||||
|
valid_data_.reserve(reserve);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +218,7 @@ class ColumnBase {
|
|||||||
ColumnBase(ColumnBase&& column) noexcept
|
ColumnBase(ColumnBase&& column) noexcept
|
||||||
: data_(column.data_),
|
: data_(column.data_),
|
||||||
nullable_(column.nullable_),
|
nullable_(column.nullable_),
|
||||||
valid_data_(column.valid_data_),
|
valid_data_(std::move(column.valid_data_)),
|
||||||
padding_(column.padding_),
|
padding_(column.padding_),
|
||||||
type_size_(column.type_size_),
|
type_size_(column.type_size_),
|
||||||
num_rows_(column.num_rows_),
|
num_rows_(column.num_rows_),
|
||||||
@ -282,7 +286,7 @@ class ColumnBase {
|
|||||||
"GetBatchBuffer only supported for VariableColumn");
|
"GetBatchBuffer only supported for VariableColumn");
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual std::vector<std::string_view>
|
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
StringViews() const {
|
StringViews() const {
|
||||||
PanicInfo(ErrorCode::Unsupported,
|
PanicInfo(ErrorCode::Unsupported,
|
||||||
"StringViews only supported for VariableColumn");
|
"StringViews only supported for VariableColumn");
|
||||||
@ -519,7 +523,8 @@ class Column : public ColumnBase {
|
|||||||
|
|
||||||
SpanBase
|
SpanBase
|
||||||
Span() const override {
|
Span() const override {
|
||||||
return SpanBase(data_, num_rows_, data_cap_size_ / num_rows_);
|
return SpanBase(
|
||||||
|
data_, valid_data_.data(), num_rows_, data_cap_size_ / num_rows_);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -681,7 +686,7 @@ class VariableColumn : public ColumnBase {
|
|||||||
"span() interface is not implemented for variable column");
|
"span() interface is not implemented for variable column");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string_view>
|
std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
StringViews() const override {
|
StringViews() const override {
|
||||||
std::vector<std::string_view> res;
|
std::vector<std::string_view> res;
|
||||||
char* pos = data_;
|
char* pos = data_;
|
||||||
@ -692,7 +697,7 @@ class VariableColumn : public ColumnBase {
|
|||||||
res.emplace_back(std::string_view(pos, size));
|
res.emplace_back(std::string_view(pos, size));
|
||||||
pos += size;
|
pos += size;
|
||||||
}
|
}
|
||||||
return res;
|
return std::make_pair(res, valid_data_);
|
||||||
}
|
}
|
||||||
|
|
||||||
[[nodiscard]] std::vector<ViewType>
|
[[nodiscard]] std::vector<ViewType>
|
||||||
@ -861,7 +866,10 @@ class ArrayColumn : public ColumnBase {
|
|||||||
|
|
||||||
SpanBase
|
SpanBase
|
||||||
Span() const override {
|
Span() const override {
|
||||||
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
|
return SpanBase(views_.data(),
|
||||||
|
valid_data_.data(),
|
||||||
|
views_.size(),
|
||||||
|
sizeof(ArrayView));
|
||||||
}
|
}
|
||||||
|
|
||||||
[[nodiscard]] const std::vector<ArrayView>&
|
[[nodiscard]] const std::vector<ArrayView>&
|
||||||
@ -885,8 +893,8 @@ class ArrayColumn : public ColumnBase {
|
|||||||
element_indices_.emplace_back(array.get_offsets());
|
element_indices_.emplace_back(array.get_offsets());
|
||||||
if (nullable_) {
|
if (nullable_) {
|
||||||
return ColumnBase::Append(static_cast<const char*>(array.data()),
|
return ColumnBase::Append(static_cast<const char*>(array.data()),
|
||||||
array.byte_size(),
|
valid_data,
|
||||||
valid_data);
|
array.byte_size());
|
||||||
}
|
}
|
||||||
ColumnBase::Append(static_cast<const char*>(array.data()),
|
ColumnBase::Append(static_cast<const char*>(array.data()),
|
||||||
array.byte_size());
|
array.byte_size());
|
||||||
|
|||||||
@ -68,11 +68,12 @@ class SealedDataGetter : public DataGetter<T> {
|
|||||||
if constexpr (std::is_same_v<T, std::string>) {
|
if constexpr (std::is_same_v<T, std::string>) {
|
||||||
str_field_data_ =
|
str_field_data_ =
|
||||||
std::make_shared<std::vector<std::string_view>>(
|
std::make_shared<std::vector<std::string_view>>(
|
||||||
segment.chunk_view<std::string_view>(field_id, 0));
|
segment.chunk_view<std::string_view>(field_id, 0)
|
||||||
|
.first);
|
||||||
} else {
|
} else {
|
||||||
auto span = segment.chunk_data<T>(field_id, 0);
|
auto span = segment.chunk_data<T>(field_id, 0);
|
||||||
field_data_ =
|
field_data_ = std::make_shared<Span<T>>(
|
||||||
std::make_shared<Span<T>>(span.data(), span.row_count());
|
span.data(), span.valid_data(), span.row_count());
|
||||||
}
|
}
|
||||||
} else if (segment.HasIndex(field_id)) {
|
} else if (segment.HasIndex(field_id)) {
|
||||||
this->field_index_ = &(segment.chunk_scalar_index<T>(field_id, 0));
|
this->field_index_ = &(segment.chunk_scalar_index<T>(field_id, 0));
|
||||||
|
|||||||
@ -128,6 +128,12 @@ class VectorBase {
|
|||||||
virtual int64_t
|
virtual int64_t
|
||||||
get_chunk_size(ssize_t chunk_index) const = 0;
|
get_chunk_size(ssize_t chunk_index) const = 0;
|
||||||
|
|
||||||
|
virtual int64_t
|
||||||
|
get_element_size() const = 0;
|
||||||
|
|
||||||
|
virtual int64_t
|
||||||
|
get_element_offset(ssize_t chunk_index) const = 0;
|
||||||
|
|
||||||
virtual ssize_t
|
virtual ssize_t
|
||||||
num_chunk() const = 0;
|
num_chunk() const = 0;
|
||||||
|
|
||||||
@ -245,6 +251,26 @@ class ConcurrentVectorImpl : public VectorBase {
|
|||||||
return chunks_ptr_->get_chunk_size(chunk_index);
|
return chunks_ptr_->get_chunk_size(chunk_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t
|
||||||
|
get_element_size() const override {
|
||||||
|
if constexpr (is_type_entire_row) {
|
||||||
|
return chunks_ptr_->get_element_size();
|
||||||
|
} else if constexpr (std::is_same_v<Type, int64_t> || // NOLINT
|
||||||
|
std::is_same_v<Type, int>) {
|
||||||
|
// only for testing
|
||||||
|
PanicInfo(NotImplemented, "unimplemented");
|
||||||
|
} else {
|
||||||
|
static_assert(
|
||||||
|
std::is_same_v<typename TraitType::embedded_type, Type>);
|
||||||
|
return elements_per_row_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t
|
||||||
|
get_element_offset(ssize_t chunk_index) const override {
|
||||||
|
return chunks_ptr_->get_element_offset(chunk_index);
|
||||||
|
}
|
||||||
|
|
||||||
// just for fun, don't use it directly
|
// just for fun, don't use it directly
|
||||||
const Type*
|
const Type*
|
||||||
get_element(ssize_t element_index) const {
|
get_element(ssize_t element_index) const {
|
||||||
|
|||||||
@ -460,6 +460,13 @@ class ThreadSafeValidData {
|
|||||||
return data_[offset];
|
return data_[offset];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool*
|
||||||
|
get_chunk_data(size_t offset) {
|
||||||
|
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||||
|
Assert(offset < length_);
|
||||||
|
return &data_[offset];
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
mutable std::shared_mutex mutex_{};
|
mutable std::shared_mutex mutex_{};
|
||||||
FixedVector<bool> data_;
|
FixedVector<bool> data_;
|
||||||
@ -770,10 +777,30 @@ struct InsertRecord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
is_valid_data_exist(FieldId field_id) {
|
is_data_exist(FieldId field_id) const {
|
||||||
|
return data_.find(field_id) != data_.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
is_valid_data_exist(FieldId field_id) const {
|
||||||
return valid_data_.find(field_id) != valid_data_.end();
|
return valid_data_.find(field_id) != valid_data_.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SpanBase
|
||||||
|
get_span_base(FieldId field_id, int64_t chunk_id) const {
|
||||||
|
auto data = get_data_base(field_id);
|
||||||
|
if (is_valid_data_exist(field_id)) {
|
||||||
|
auto size = data->get_chunk_size(chunk_id);
|
||||||
|
auto element_offset = data->get_element_offset(chunk_id);
|
||||||
|
return SpanBase(
|
||||||
|
data->get_chunk_data(chunk_id),
|
||||||
|
get_valid_data(field_id)->get_chunk_data(element_offset),
|
||||||
|
size,
|
||||||
|
data->get_element_size());
|
||||||
|
}
|
||||||
|
return data->get_span_base(chunk_id);
|
||||||
|
}
|
||||||
|
|
||||||
// append a column of scalar or sparse float vector type
|
// append a column of scalar or sparse float vector type
|
||||||
template <typename Type>
|
template <typename Type>
|
||||||
void
|
void
|
||||||
|
|||||||
@ -345,11 +345,10 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||||||
|
|
||||||
SpanBase
|
SpanBase
|
||||||
SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
||||||
auto vec = get_insert_record().get_data_base(field_id);
|
return get_insert_record().get_span_base(field_id, chunk_id);
|
||||||
return vec->get_span_base(chunk_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string_view>
|
std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
||||||
PanicInfo(ErrorCode::NotImplemented,
|
PanicInfo(ErrorCode::NotImplemented,
|
||||||
"chunk view impl not implement for growing segment");
|
"chunk view impl not implement for growing segment");
|
||||||
|
|||||||
@ -76,6 +76,14 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||||||
return id_;
|
return id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
is_nullable(FieldId field_id) const override {
|
||||||
|
AssertInfo(insert_record_.is_data_exist(field_id),
|
||||||
|
"Cannot find field_data with field_id: " +
|
||||||
|
std::to_string(field_id.get()));
|
||||||
|
return insert_record_.is_valid_data_exist(field_id);
|
||||||
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
const InsertRecord<>&
|
const InsertRecord<>&
|
||||||
get_insert_record() const {
|
get_insert_record() const {
|
||||||
@ -318,10 +326,10 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||||||
SpanBase
|
SpanBase
|
||||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||||
|
|
||||||
std::vector<std::string_view>
|
std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||||
|
|
||||||
BufferView
|
std::pair<BufferView, FixedVector<bool>>
|
||||||
get_chunk_buffer(FieldId field_id,
|
get_chunk_buffer(FieldId field_id,
|
||||||
int64_t chunk_id,
|
int64_t chunk_id,
|
||||||
int64_t start_offset,
|
int64_t start_offset,
|
||||||
|
|||||||
@ -126,6 +126,9 @@ class SegmentInterface {
|
|||||||
|
|
||||||
virtual bool
|
virtual bool
|
||||||
HasRawData(int64_t field_id) const = 0;
|
HasRawData(int64_t field_id) const = 0;
|
||||||
|
|
||||||
|
virtual bool
|
||||||
|
is_nullable(FieldId field_id) const = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
// internal API for DSL calculation
|
// internal API for DSL calculation
|
||||||
@ -139,23 +142,26 @@ class SegmentInternalInterface : public SegmentInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename ViewType>
|
template <typename ViewType>
|
||||||
std::vector<ViewType>
|
std::pair<std::vector<ViewType>, FixedVector<bool>>
|
||||||
chunk_view(FieldId field_id, int64_t chunk_id) const {
|
chunk_view(FieldId field_id, int64_t chunk_id) const {
|
||||||
auto string_views = chunk_view_impl(field_id, chunk_id);
|
auto chunk_info = chunk_view_impl(field_id, chunk_id);
|
||||||
|
auto string_views = chunk_info.first;
|
||||||
|
auto valid_data = chunk_info.second;
|
||||||
if constexpr (std::is_same_v<ViewType, std::string_view>) {
|
if constexpr (std::is_same_v<ViewType, std::string_view>) {
|
||||||
return std::move(string_views);
|
return std::make_pair(std::move(string_views),
|
||||||
|
std::move(valid_data));
|
||||||
} else {
|
} else {
|
||||||
std::vector<ViewType> res;
|
std::vector<ViewType> res;
|
||||||
res.reserve(string_views.size());
|
res.reserve(string_views.size());
|
||||||
for (const auto& view : string_views) {
|
for (const auto& view : string_views) {
|
||||||
res.emplace_back(view);
|
res.emplace_back(view);
|
||||||
}
|
}
|
||||||
return res;
|
return std::make_pair(res, valid_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename ViewType>
|
template <typename ViewType>
|
||||||
std::vector<ViewType>
|
std::pair<std::vector<ViewType>, FixedVector<bool>>
|
||||||
get_batch_views(FieldId field_id,
|
get_batch_views(FieldId field_id,
|
||||||
int64_t chunk_id,
|
int64_t chunk_id,
|
||||||
int64_t start_offset,
|
int64_t start_offset,
|
||||||
@ -164,8 +170,9 @@ class SegmentInternalInterface : public SegmentInterface {
|
|||||||
PanicInfo(ErrorCode::Unsupported,
|
PanicInfo(ErrorCode::Unsupported,
|
||||||
"get chunk views not supported for growing segment");
|
"get chunk views not supported for growing segment");
|
||||||
}
|
}
|
||||||
BufferView buffer =
|
auto chunk_info =
|
||||||
get_chunk_buffer(field_id, chunk_id, start_offset, length);
|
get_chunk_buffer(field_id, chunk_id, start_offset, length);
|
||||||
|
BufferView buffer = chunk_info.first;
|
||||||
std::vector<ViewType> res;
|
std::vector<ViewType> res;
|
||||||
res.reserve(length);
|
res.reserve(length);
|
||||||
char* pos = buffer.data_;
|
char* pos = buffer.data_;
|
||||||
@ -176,7 +183,7 @@ class SegmentInternalInterface : public SegmentInterface {
|
|||||||
res.emplace_back(ViewType(pos, size));
|
res.emplace_back(ViewType(pos, size));
|
||||||
pos += size;
|
pos += size;
|
||||||
}
|
}
|
||||||
return res;
|
return std::make_pair(res, chunk_info.second);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -352,16 +359,17 @@ class SegmentInternalInterface : public SegmentInterface {
|
|||||||
is_mmap_field(FieldId field_id) const = 0;
|
is_mmap_field(FieldId field_id) const = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
// todo: use an Unified struct for all type in growing/seal segment to store data and valid_data.
|
||||||
// internal API: return chunk_data in span
|
// internal API: return chunk_data in span
|
||||||
virtual SpanBase
|
virtual SpanBase
|
||||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
chunk_data_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
||||||
|
|
||||||
// internal API: return chunk string views in vector
|
// internal API: return chunk string views in vector
|
||||||
virtual std::vector<std::string_view>
|
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
||||||
|
|
||||||
// internal API: return buffer reference to field chunk data located from start_offset
|
// internal API: return buffer reference to field chunk data located from start_offset
|
||||||
virtual BufferView
|
virtual std::pair<BufferView, FixedVector<bool>>
|
||||||
get_chunk_buffer(FieldId field_id,
|
get_chunk_buffer(FieldId field_id,
|
||||||
int64_t chunk_id,
|
int64_t chunk_id,
|
||||||
int64_t start_offset,
|
int64_t start_offset,
|
||||||
|
|||||||
@ -644,7 +644,7 @@ SegmentSealedImpl::size_per_chunk() const {
|
|||||||
return get_row_count();
|
return get_row_count();
|
||||||
}
|
}
|
||||||
|
|
||||||
BufferView
|
std::pair<BufferView, FixedVector<bool>>
|
||||||
SegmentSealedImpl::get_chunk_buffer(FieldId field_id,
|
SegmentSealedImpl::get_chunk_buffer(FieldId field_id,
|
||||||
int64_t chunk_id,
|
int64_t chunk_id,
|
||||||
int64_t start_offset,
|
int64_t start_offset,
|
||||||
@ -655,7 +655,15 @@ SegmentSealedImpl::get_chunk_buffer(FieldId field_id,
|
|||||||
auto& field_meta = schema_->operator[](field_id);
|
auto& field_meta = schema_->operator[](field_id);
|
||||||
if (auto it = fields_.find(field_id); it != fields_.end()) {
|
if (auto it = fields_.find(field_id); it != fields_.end()) {
|
||||||
auto& field_data = it->second;
|
auto& field_data = it->second;
|
||||||
return field_data->GetBatchBuffer(start_offset, length);
|
FixedVector<bool> valid_data;
|
||||||
|
if (field_data->IsNullable()) {
|
||||||
|
valid_data.reserve(length);
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
valid_data.push_back(field_data->IsValid(start_offset + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return std::make_pair(field_data->GetBatchBuffer(start_offset, length),
|
||||||
|
valid_data);
|
||||||
}
|
}
|
||||||
PanicInfo(ErrorCode::UnexpectedError,
|
PanicInfo(ErrorCode::UnexpectedError,
|
||||||
"get_chunk_buffer only used for variable column field");
|
"get_chunk_buffer only used for variable column field");
|
||||||
@ -680,10 +688,11 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
|||||||
auto field_data = insert_record_.get_data_base(field_id);
|
auto field_data = insert_record_.get_data_base(field_id);
|
||||||
AssertInfo(field_data->num_chunk() == 1,
|
AssertInfo(field_data->num_chunk() == 1,
|
||||||
"num chunk not equal to 1 for sealed segment");
|
"num chunk not equal to 1 for sealed segment");
|
||||||
|
// system field
|
||||||
return field_data->get_span_base(0);
|
return field_data->get_span_base(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string_view>
|
std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
||||||
std::shared_lock lck(mutex_);
|
std::shared_lock lck(mutex_);
|
||||||
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
|
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
|
||||||
|
|||||||
@ -117,6 +117,15 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||||||
return insert_record_.search_pk(pk, ts);
|
return insert_record_.search_pk(pk, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
is_nullable(FieldId field_id) const override {
|
||||||
|
auto it = fields_.find(field_id);
|
||||||
|
AssertInfo(it != fields_.end(),
|
||||||
|
"Cannot find field with field_id: " +
|
||||||
|
std::to_string(field_id.get()));
|
||||||
|
return it->second->IsNullable();
|
||||||
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
int64_t
|
int64_t
|
||||||
num_chunk_index(FieldId field_id) const override;
|
num_chunk_index(FieldId field_id) const override;
|
||||||
@ -167,10 +176,10 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||||||
SpanBase
|
SpanBase
|
||||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||||
|
|
||||||
std::vector<std::string_view>
|
std::pair<std::vector<std::string_view>, FixedVector<bool>>
|
||||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||||
|
|
||||||
BufferView
|
std::pair<BufferView, FixedVector<bool>>
|
||||||
get_chunk_buffer(FieldId field_id,
|
get_chunk_buffer(FieldId field_id,
|
||||||
int64_t chunk_id,
|
int64_t chunk_id,
|
||||||
int64_t start_offset,
|
int64_t start_offset,
|
||||||
|
|||||||
@ -19,7 +19,7 @@ TEST(Common, Span) {
|
|||||||
using namespace milvus;
|
using namespace milvus;
|
||||||
using namespace milvus::segcore;
|
using namespace milvus::segcore;
|
||||||
|
|
||||||
Span<float> s1(nullptr, 100);
|
Span<float> s1(nullptr, nullptr, 100);
|
||||||
Span<milvus::FloatVector> s2(nullptr, 10, 16 * sizeof(float));
|
Span<milvus::FloatVector> s2(nullptr, 10, 16 * sizeof(float));
|
||||||
SpanBase b1 = s1;
|
SpanBase b1 = s1;
|
||||||
SpanBase b2 = s2;
|
SpanBase b2 = s2;
|
||||||
|
|||||||
@ -408,6 +408,20 @@ TEST(Sealed, LoadFieldData) {
|
|||||||
schema->AddDebugField("json", DataType::JSON);
|
schema->AddDebugField("json", DataType::JSON);
|
||||||
schema->AddDebugField("array", DataType::ARRAY, DataType::INT64);
|
schema->AddDebugField("array", DataType::ARRAY, DataType::INT64);
|
||||||
schema->set_primary_field_id(counter_id);
|
schema->set_primary_field_id(counter_id);
|
||||||
|
auto int8_nullable_id =
|
||||||
|
schema->AddDebugField("int8_null", DataType::INT8, true);
|
||||||
|
auto int16_nullable_id =
|
||||||
|
schema->AddDebugField("int16_null", DataType::INT16, true);
|
||||||
|
auto int32_nullable_id =
|
||||||
|
schema->AddDebugField("int32_null", DataType::INT32, true);
|
||||||
|
auto int64_nullable_id =
|
||||||
|
schema->AddDebugField("int64_null", DataType::INT64, true);
|
||||||
|
auto double_nullable_id =
|
||||||
|
schema->AddDebugField("double_null", DataType::DOUBLE, true);
|
||||||
|
auto str_nullable_id =
|
||||||
|
schema->AddDebugField("str_null", DataType::VARCHAR, true);
|
||||||
|
auto float_nullable_id =
|
||||||
|
schema->AddDebugField("float_null", DataType::FLOAT, true);
|
||||||
|
|
||||||
auto dataset = DataGen(schema, N);
|
auto dataset = DataGen(schema, N);
|
||||||
|
|
||||||
@ -500,13 +514,49 @@ TEST(Sealed, LoadFieldData) {
|
|||||||
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
|
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
|
||||||
auto chunk_span3 =
|
auto chunk_span3 =
|
||||||
segment->get_batch_views<std::string_view>(str_id, 0, 0, N);
|
segment->get_batch_views<std::string_view>(str_id, 0, 0, N);
|
||||||
|
auto chunk_span4 = segment->chunk_data<int8_t>(int8_nullable_id, 0);
|
||||||
|
auto chunk_span5 = segment->chunk_data<int16_t>(int16_nullable_id, 0);
|
||||||
|
auto chunk_span6 = segment->chunk_data<int32_t>(int32_nullable_id, 0);
|
||||||
|
auto chunk_span7 = segment->chunk_data<int64_t>(int64_nullable_id, 0);
|
||||||
|
auto chunk_span8 = segment->chunk_data<double>(double_nullable_id, 0);
|
||||||
|
auto chunk_span9 =
|
||||||
|
segment->get_batch_views<std::string_view>(str_nullable_id, 0, 0, N);
|
||||||
|
|
||||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||||
auto ref2 = dataset.get_col<double>(double_id);
|
auto ref2 = dataset.get_col<double>(double_id);
|
||||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||||
|
auto ref4 = dataset.get_col<int8_t>(int8_nullable_id);
|
||||||
|
auto ref5 = dataset.get_col<int16_t>(int16_nullable_id);
|
||||||
|
auto ref6 = dataset.get_col<int32_t>(int32_nullable_id);
|
||||||
|
auto ref7 = dataset.get_col<int64_t>(int64_nullable_id);
|
||||||
|
auto ref8 = dataset.get_col<double>(double_nullable_id);
|
||||||
|
auto ref9 =
|
||||||
|
dataset.get_col(str_nullable_id)->scalars().string_data().data();
|
||||||
|
auto valid4 = dataset.get_col_valid(int8_nullable_id);
|
||||||
|
auto valid5 = dataset.get_col_valid(int16_nullable_id);
|
||||||
|
auto valid6 = dataset.get_col_valid(int32_nullable_id);
|
||||||
|
auto valid7 = dataset.get_col_valid(int64_nullable_id);
|
||||||
|
auto valid8 = dataset.get_col_valid(double_nullable_id);
|
||||||
|
auto valid9 = dataset.get_col_valid(str_nullable_id);
|
||||||
|
ASSERT_EQ(chunk_span1.valid_data(), nullptr);
|
||||||
|
ASSERT_EQ(chunk_span2.valid_data(), nullptr);
|
||||||
|
ASSERT_EQ(chunk_span3.second.size(), 0);
|
||||||
for (int i = 0; i < N; ++i) {
|
for (int i = 0; i < N; ++i) {
|
||||||
ASSERT_EQ(chunk_span1[i], ref1[i]);
|
ASSERT_EQ(chunk_span1.data()[i], ref1[i]);
|
||||||
ASSERT_EQ(chunk_span2[i], ref2[i]);
|
ASSERT_EQ(chunk_span2.data()[i], ref2[i]);
|
||||||
ASSERT_EQ(chunk_span3[i], ref3[i]);
|
ASSERT_EQ(chunk_span3.first[i], ref3[i]);
|
||||||
|
ASSERT_EQ(chunk_span4.data()[i], ref4[i]);
|
||||||
|
ASSERT_EQ(chunk_span5.data()[i], ref5[i]);
|
||||||
|
ASSERT_EQ(chunk_span6.data()[i], ref6[i]);
|
||||||
|
ASSERT_EQ(chunk_span7.data()[i], ref7[i]);
|
||||||
|
ASSERT_EQ(chunk_span8.data()[i], ref8[i]);
|
||||||
|
ASSERT_EQ(chunk_span9.first[i], ref9[i]);
|
||||||
|
ASSERT_EQ(chunk_span4.valid_data()[i], valid4[i]);
|
||||||
|
ASSERT_EQ(chunk_span5.valid_data()[i], valid5[i]);
|
||||||
|
ASSERT_EQ(chunk_span6.valid_data()[i], valid6[i]);
|
||||||
|
ASSERT_EQ(chunk_span7.valid_data()[i], valid7[i]);
|
||||||
|
ASSERT_EQ(chunk_span8.valid_data()[i], valid8[i]);
|
||||||
|
ASSERT_EQ(chunk_span9.second[i], valid9[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
||||||
@ -630,10 +680,11 @@ TEST(Sealed, ClearData) {
|
|||||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||||
auto ref2 = dataset.get_col<double>(double_id);
|
auto ref2 = dataset.get_col<double>(double_id);
|
||||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||||
|
ASSERT_EQ(chunk_span3.second.size(), 0);
|
||||||
for (int i = 0; i < N; ++i) {
|
for (int i = 0; i < N; ++i) {
|
||||||
ASSERT_EQ(chunk_span1[i], ref1[i]);
|
ASSERT_EQ(chunk_span1[i], ref1[i]);
|
||||||
ASSERT_EQ(chunk_span2[i], ref2[i]);
|
ASSERT_EQ(chunk_span2[i], ref2[i]);
|
||||||
ASSERT_EQ(chunk_span3[i], ref3[i]);
|
ASSERT_EQ(chunk_span3.first[i], ref3[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
||||||
@ -733,10 +784,11 @@ TEST(Sealed, LoadFieldDataMmap) {
|
|||||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||||
auto ref2 = dataset.get_col<double>(double_id);
|
auto ref2 = dataset.get_col<double>(double_id);
|
||||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||||
|
ASSERT_EQ(chunk_span3.second.size(), 0);
|
||||||
for (int i = 0; i < N; ++i) {
|
for (int i = 0; i < N; ++i) {
|
||||||
ASSERT_EQ(chunk_span1[i], ref1[i]);
|
ASSERT_EQ(chunk_span1[i], ref1[i]);
|
||||||
ASSERT_EQ(chunk_span2[i], ref2[i]);
|
ASSERT_EQ(chunk_span2[i], ref2[i]);
|
||||||
ASSERT_EQ(chunk_span3[i], ref3[i]);
|
ASSERT_EQ(chunk_span3.first[i], ref3[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
||||||
|
|||||||
@ -29,6 +29,8 @@ TEST(Span, Naive) {
|
|||||||
auto float_vec_fid = schema->AddDebugField(
|
auto float_vec_fid = schema->AddDebugField(
|
||||||
"floatvec", DataType::VECTOR_FLOAT, 32, knowhere::metric::L2);
|
"floatvec", DataType::VECTOR_FLOAT, 32, knowhere::metric::L2);
|
||||||
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
|
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
|
||||||
|
auto nullable_fid =
|
||||||
|
schema->AddDebugField("nullable", DataType::INT64, true);
|
||||||
schema->set_primary_field_id(i64_fid);
|
schema->set_primary_field_id(i64_fid);
|
||||||
|
|
||||||
auto dataset = DataGen(schema, N);
|
auto dataset = DataGen(schema, N);
|
||||||
@ -42,6 +44,8 @@ TEST(Span, Naive) {
|
|||||||
auto vec_ptr = dataset.get_col<uint8_t>(bin_vec_fid);
|
auto vec_ptr = dataset.get_col<uint8_t>(bin_vec_fid);
|
||||||
auto age_ptr = dataset.get_col<float>(float_fid);
|
auto age_ptr = dataset.get_col<float>(float_fid);
|
||||||
auto float_ptr = dataset.get_col<float>(float_vec_fid);
|
auto float_ptr = dataset.get_col<float>(float_vec_fid);
|
||||||
|
auto nullable_data_ptr = dataset.get_col<int64_t>(nullable_fid);
|
||||||
|
auto nullable_valid_data_ptr = dataset.get_col_valid(nullable_fid);
|
||||||
auto num_chunk = segment->num_chunk();
|
auto num_chunk = segment->num_chunk();
|
||||||
ASSERT_EQ(num_chunk, upper_div(N, size_per_chunk));
|
ASSERT_EQ(num_chunk, upper_div(N, size_per_chunk));
|
||||||
auto row_count = segment->get_row_count();
|
auto row_count = segment->get_row_count();
|
||||||
@ -52,9 +56,12 @@ TEST(Span, Naive) {
|
|||||||
auto age_span = segment->chunk_data<float>(float_fid, chunk_id);
|
auto age_span = segment->chunk_data<float>(float_fid, chunk_id);
|
||||||
auto float_span =
|
auto float_span =
|
||||||
segment->chunk_data<milvus::FloatVector>(float_vec_fid, chunk_id);
|
segment->chunk_data<milvus::FloatVector>(float_vec_fid, chunk_id);
|
||||||
|
auto null_field_span =
|
||||||
|
segment->chunk_data<int64_t>(nullable_fid, chunk_id);
|
||||||
auto begin = chunk_id * size_per_chunk;
|
auto begin = chunk_id * size_per_chunk;
|
||||||
auto end = std::min((chunk_id + 1) * size_per_chunk, N);
|
auto end = std::min((chunk_id + 1) * size_per_chunk, N);
|
||||||
auto size_of_chunk = end - begin;
|
auto size_of_chunk = end - begin;
|
||||||
|
ASSERT_EQ(age_span.valid_data(), nullptr);
|
||||||
for (int i = 0; i < size_of_chunk * 512 / 8; ++i) {
|
for (int i = 0; i < size_of_chunk * 512 / 8; ++i) {
|
||||||
ASSERT_EQ(vec_span.data()[i], vec_ptr[i + begin * 512 / 8]);
|
ASSERT_EQ(vec_span.data()[i], vec_ptr[i + begin * 512 / 8]);
|
||||||
}
|
}
|
||||||
@ -64,5 +71,12 @@ TEST(Span, Naive) {
|
|||||||
for (int i = 0; i < size_of_chunk; ++i) {
|
for (int i = 0; i < size_of_chunk; ++i) {
|
||||||
ASSERT_EQ(float_span.data()[i], float_ptr[i + begin * 32]);
|
ASSERT_EQ(float_span.data()[i], float_ptr[i + begin * 32]);
|
||||||
}
|
}
|
||||||
|
for (int i = 0; i < size_of_chunk; ++i) {
|
||||||
|
ASSERT_EQ(null_field_span.data()[i], nullable_data_ptr[i + begin]);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < size_of_chunk; ++i) {
|
||||||
|
ASSERT_EQ(null_field_span.valid_data()[i],
|
||||||
|
nullable_valid_data_ptr[i + begin]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user