mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
fix: get correct size when sealed segment chunked (#37062)
#37019 Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
80aa9ab4d6
commit
6ef014d931
@ -137,15 +137,8 @@ class SegmentExpr : public Expr {
|
|||||||
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
|
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
|
||||||
auto data_pos =
|
auto data_pos =
|
||||||
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
||||||
int64_t size = 0;
|
// if segment is chunked, type won't be growing
|
||||||
if (segment_->type() == SegmentType::Growing) {
|
int64_t size = segment_->chunk_size(field_id_, i) - data_pos;
|
||||||
size = (i == (num_data_chunk_ - 1) &&
|
|
||||||
active_count_ % size_per_chunk_ != 0)
|
|
||||||
? active_count_ % size_per_chunk_ - data_pos
|
|
||||||
: size_per_chunk_ - data_pos;
|
|
||||||
} else {
|
|
||||||
size = segment_->chunk_size(field_id_, i) - data_pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
size = std::min(size, batch_size_ - processed_size);
|
size = std::min(size, batch_size_ - processed_size);
|
||||||
|
|
||||||
@ -353,16 +346,8 @@ class SegmentExpr : public Expr {
|
|||||||
auto data_pos =
|
auto data_pos =
|
||||||
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
||||||
|
|
||||||
int64_t size = 0;
|
// if segment is chunked, type won't be growing
|
||||||
if (segment_->type() == SegmentType::Growing) {
|
int64_t size = segment_->chunk_size(field_id_, i) - data_pos;
|
||||||
size = (i == (num_data_chunk_ - 1))
|
|
||||||
? (active_count_ % size_per_chunk_ == 0
|
|
||||||
? size_per_chunk_ - data_pos
|
|
||||||
: active_count_ % size_per_chunk_ - data_pos)
|
|
||||||
: size_per_chunk_ - data_pos;
|
|
||||||
} else {
|
|
||||||
size = segment_->chunk_size(field_id_, i) - data_pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
size = std::min(size, batch_size_ - processed_size);
|
size = std::min(size, batch_size_ - processed_size);
|
||||||
|
|
||||||
@ -520,14 +505,19 @@ class SegmentExpr : public Expr {
|
|||||||
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
|
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
|
||||||
auto data_pos =
|
auto data_pos =
|
||||||
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
||||||
auto size =
|
int64_t size = 0;
|
||||||
(i == (num_data_chunk_ - 1))
|
if (segment_->is_chunked()) {
|
||||||
|
size = segment_->chunk_size(field_id_, i) - data_pos;
|
||||||
|
} else {
|
||||||
|
size = (i == (num_data_chunk_ - 1))
|
||||||
? (segment_->type() == SegmentType::Growing
|
? (segment_->type() == SegmentType::Growing
|
||||||
? (active_count_ % size_per_chunk_ == 0
|
? (active_count_ % size_per_chunk_ == 0
|
||||||
? size_per_chunk_ - data_pos
|
? size_per_chunk_ - data_pos
|
||||||
: active_count_ % size_per_chunk_ - data_pos)
|
: active_count_ % size_per_chunk_ -
|
||||||
|
data_pos)
|
||||||
: active_count_ - data_pos)
|
: active_count_ - data_pos)
|
||||||
: size_per_chunk_ - data_pos;
|
: size_per_chunk_ - data_pos;
|
||||||
|
}
|
||||||
|
|
||||||
size = std::min(size, batch_size_ - processed_size);
|
size = std::min(size, batch_size_ - processed_size);
|
||||||
|
|
||||||
|
|||||||
@ -101,6 +101,46 @@ TEST(chunk, test_variable_field) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(chunk, test_null_field) {
|
||||||
|
FixedVector<int64_t> data = {1, 2, 3, 4, 5};
|
||||||
|
auto field_data =
|
||||||
|
milvus::storage::CreateFieldData(storage::DataType::INT64, true);
|
||||||
|
uint8_t* valid_data_ = new uint8_t[1]{0x13};
|
||||||
|
field_data->FillFieldData(data.data(), valid_data_, data.size());
|
||||||
|
storage::InsertEventData event_data;
|
||||||
|
event_data.field_data = field_data;
|
||||||
|
auto ser_data = event_data.Serialize();
|
||||||
|
auto buffer = std::make_shared<arrow::io::BufferReader>(
|
||||||
|
ser_data.data() + 2 * sizeof(milvus::Timestamp),
|
||||||
|
ser_data.size() - 2 * sizeof(milvus::Timestamp));
|
||||||
|
|
||||||
|
parquet::arrow::FileReaderBuilder reader_builder;
|
||||||
|
auto s = reader_builder.Open(buffer);
|
||||||
|
EXPECT_TRUE(s.ok());
|
||||||
|
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
|
||||||
|
s = reader_builder.Build(&arrow_reader);
|
||||||
|
EXPECT_TRUE(s.ok());
|
||||||
|
|
||||||
|
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
|
||||||
|
s = arrow_reader->GetRecordBatchReader(&rb_reader);
|
||||||
|
EXPECT_TRUE(s.ok());
|
||||||
|
|
||||||
|
FieldMeta field_meta(
|
||||||
|
FieldName("a"), milvus::FieldId(1), DataType::INT64, true);
|
||||||
|
auto chunk = create_chunk(field_meta, 1, rb_reader);
|
||||||
|
auto span = std::dynamic_pointer_cast<FixedWidthChunk>(chunk)->Span();
|
||||||
|
EXPECT_EQ(span.row_count(), data.size());
|
||||||
|
data = {1, 2, 0, 0, 5};
|
||||||
|
FixedVector<bool> valid_data = {true, true, false, false, true};
|
||||||
|
for (size_t i = 0; i < data.size(); ++i) {
|
||||||
|
auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof());
|
||||||
|
EXPECT_EQ(n, data[i]);
|
||||||
|
auto v = *(bool*)((char*)span.valid_data() + i);
|
||||||
|
EXPECT_EQ(v, valid_data[i]);
|
||||||
|
}
|
||||||
|
delete[] valid_data_;
|
||||||
|
}
|
||||||
|
|
||||||
TEST(chunk, test_array) {
|
TEST(chunk, test_array) {
|
||||||
milvus::proto::schema::ScalarField field_string_data;
|
milvus::proto::schema::ScalarField field_string_data;
|
||||||
field_string_data.mutable_string_data()->add_data("test_array1");
|
field_string_data.mutable_string_data()->add_data("test_array1");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user