diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index 307792a539..12a48af424 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -137,15 +137,8 @@ class SegmentExpr : public Expr { for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; - int64_t size = 0; - if (segment_->type() == SegmentType::Growing) { - size = (i == (num_data_chunk_ - 1) && - active_count_ % size_per_chunk_ != 0) - ? active_count_ % size_per_chunk_ - data_pos - : size_per_chunk_ - data_pos; - } else { - size = segment_->chunk_size(field_id_, i) - data_pos; - } + // if segment is chunked, type won't be growing + int64_t size = segment_->chunk_size(field_id_, i) - data_pos; size = std::min(size, batch_size_ - processed_size); @@ -353,16 +346,8 @@ class SegmentExpr : public Expr { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; - int64_t size = 0; - if (segment_->type() == SegmentType::Growing) { - size = (i == (num_data_chunk_ - 1)) - ? (active_count_ % size_per_chunk_ == 0 - ? size_per_chunk_ - data_pos - : active_count_ % size_per_chunk_ - data_pos) - : size_per_chunk_ - data_pos; - } else { - size = segment_->chunk_size(field_id_, i) - data_pos; - } + // if segment is chunked, type won't be growing + int64_t size = segment_->chunk_size(field_id_, i) - data_pos; size = std::min(size, batch_size_ - processed_size); @@ -520,14 +505,19 @@ class SegmentExpr : public Expr { for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; - auto size = - (i == (num_data_chunk_ - 1)) - ? (segment_->type() == SegmentType::Growing - ? (active_count_ % size_per_chunk_ == 0 - ? size_per_chunk_ - data_pos - : active_count_ % size_per_chunk_ - data_pos) - : active_count_ - data_pos) - : size_per_chunk_ - data_pos; + int64_t size = 0; + if (segment_->is_chunked()) { + size = segment_->chunk_size(field_id_, i) - data_pos; + } else { + size = (i == (num_data_chunk_ - 1)) + ? (segment_->type() == SegmentType::Growing + ? (active_count_ % size_per_chunk_ == 0 + ? size_per_chunk_ - data_pos + : active_count_ % size_per_chunk_ - + data_pos) + : active_count_ - data_pos) + : size_per_chunk_ - data_pos; + } size = std::min(size, batch_size_ - processed_size); diff --git a/internal/core/unittest/test_chunk.cpp b/internal/core/unittest/test_chunk.cpp index 126f11cc47..1c1eb84ce4 100644 --- a/internal/core/unittest/test_chunk.cpp +++ b/internal/core/unittest/test_chunk.cpp @@ -101,6 +101,46 @@ TEST(chunk, test_variable_field) { } } +TEST(chunk, test_null_field) { + FixedVector data = {1, 2, 3, 4, 5}; + auto field_data = + milvus::storage::CreateFieldData(storage::DataType::INT64, true); + uint8_t* valid_data_ = new uint8_t[1]{0x13}; + field_data->FillFieldData(data.data(), valid_data_, data.size()); + storage::InsertEventData event_data; + event_data.field_data = field_data; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + auto s = reader_builder.Open(buffer); + EXPECT_TRUE(s.ok()); + std::unique_ptr arrow_reader; + s = reader_builder.Build(&arrow_reader); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + s = arrow_reader->GetRecordBatchReader(&rb_reader); + EXPECT_TRUE(s.ok()); + + FieldMeta field_meta( + FieldName("a"), milvus::FieldId(1), DataType::INT64, true); + auto chunk = create_chunk(field_meta, 1, rb_reader); + auto span = std::dynamic_pointer_cast(chunk)->Span(); + EXPECT_EQ(span.row_count(), data.size()); + data = {1, 2, 0, 0, 5}; + FixedVector valid_data = {true, true, false, false, true}; + for (size_t i = 0; i < data.size(); ++i) { + auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof()); + EXPECT_EQ(n, data[i]); + auto v = *(bool*)((char*)span.valid_data() + i); + EXPECT_EQ(v, valid_data[i]); + } + delete[] valid_data_; +} + TEST(chunk, test_array) { milvus::proto::schema::ScalarField field_string_data; field_string_data.mutable_string_data()->add_data("test_array1");