feat: support chunked column for sealed segment (#35764)

This PR splits sealed segment to chunked data to avoid unnecessary
memory copy and save memory usage when loading segments so that loading
can be accelerated.

To support rollback to previous version, we add an option
`multipleChunkedEnable` which is false by default.

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2024-10-12 15:04:52 +08:00 committed by GitHub
parent 5713620825
commit a75bb85f3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 4911 additions and 483 deletions

View File

@ -400,6 +400,7 @@ queryNode:
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: false # Enable multiple chunked search
knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index

View File

@ -25,6 +25,7 @@
#include "ctz.h"
#include "popcount.h"
#include "bitset/common.h"
namespace milvus {
namespace bitset {
namespace detail {

View File

@ -18,15 +18,13 @@
namespace milvus {
std::vector<std::string_view>
StringChunk::StringViews() const {
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringChunk::StringViews() {
std::vector<std::string_view> ret;
for (int i = 0; i < row_nums_ - 1; i++) {
for (int i = 0; i < row_nums_; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
ret.emplace_back(data_ + offsets_[row_nums_ - 1],
size_ - MMAP_STRING_PADDING - offsets_[row_nums_ - 1]);
return ret;
return {ret, valid_};
}
void
@ -34,20 +32,22 @@ ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);
for (int i = 0; i < row_nums_; ++i) {
auto data_ptr = data_ + offsets_[i];
auto next_data_ptr = i == row_nums_ - 1
? data_ + size_ - MMAP_ARRAY_PADDING
: data_ + offsets_[i + 1];
auto offsets_len = lens_[i] * sizeof(uint64_t);
int offset = offsets_lens_[2 * i];
int next_offset = offsets_lens_[2 * (i + 1)];
int len = offsets_lens_[2 * i + 1];
auto data_ptr = data_ + offset;
auto offsets_len = 0;
std::vector<uint64_t> element_indices = {};
if (IsStringDataType(element_type_)) {
offsets_len = len * sizeof(uint64_t);
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
}
views_.emplace_back(data_ptr + offsets_len,
next_data_ptr - data_ptr - offsets_len,
next_offset - offset - offsets_len,
element_type_,
std::move(element_indices));
}
@ -55,7 +55,10 @@ ArrayChunk::ConstructViews() {
SpanBase
ArrayChunk::Span() const {
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
return SpanBase(views_.data(),
nullable_ ? valid_.data() : nullptr,
views_.size(),
sizeof(ArrayView));
}
} // namespace milvus

View File

@ -21,60 +21,126 @@
#include "arrow/record_batch.h"
#include "common/Array.h"
#include "common/ChunkTarget.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Span.h"
#include "knowhere/sparse_utils.h"
#include "simdjson/common_defs.h"
#include "sys/mman.h"
#include "common/Types.h"
namespace milvus {
constexpr size_t MMAP_STRING_PADDING = 1;
constexpr size_t MMAP_ARRAY_PADDING = 1;
constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1;
class Chunk {
public:
Chunk() = default;
Chunk(int64_t row_nums, char* data, size_t size)
: row_nums_(row_nums), data_(data), size_(size) {
Chunk(int64_t row_nums, char* data, uint64_t size, bool nullable)
: row_nums_(row_nums), data_(data), size_(size), nullable_(nullable) {
if (nullable) {
valid_.reserve(row_nums);
for (int i = 0; i < row_nums; i++) {
valid_.push_back((data[i >> 3] >> (i & 0x07)) & 1);
}
}
}
virtual ~Chunk() {
munmap(data_, size_);
}
uint64_t
Size() const {
return size_;
}
int64_t
RowNums() const {
return row_nums_;
}
virtual const char*
ValueAt(int64_t idx) const = 0;
virtual const char*
Data() const {
return data_;
}
virtual bool
isValid(int offset) {
return valid_[offset];
};
protected:
char* data_;
int64_t row_nums_;
size_t size_;
uint64_t size_;
bool nullable_;
FixedVector<bool>
valid_; // parse null bitmap to valid_ to be compatible with SpanBase
};
// for fixed size data, includes fixed size array
template <typename T>
class FixedWidthChunk : public Chunk {
public:
FixedWidthChunk(int32_t row_nums, int32_t dim, char* data, size_t size)
: Chunk(row_nums, data, size), dim_(dim){};
FixedWidthChunk(int32_t row_nums,
int32_t dim,
char* data,
uint64_t size,
uint64_t element_size,
bool nullable)
: Chunk(row_nums, data, size, nullable),
dim_(dim),
element_size_(element_size){};
milvus::SpanBase
Span() const {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return milvus::SpanBase(
data_ + null_bitmap_bytes_num, row_nums_, sizeof(T) * dim_);
return milvus::SpanBase(data_ + null_bitmap_bytes_num,
nullable_ ? valid_.data() : nullptr,
row_nums_,
element_size_ * dim_);
}
const char*
ValueAt(int64_t idx) const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num + idx * element_size_ * dim_;
}
const char*
Data() const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num;
}
private:
int dim_;
int element_size_;
};
class StringChunk : public Chunk {
public:
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
StringChunk(int32_t row_nums, char* data, uint64_t size, bool nullable)
: Chunk(row_nums, data, size, nullable) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}
std::vector<std::string_view>
StringViews() const;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews();
const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"StringChunk::ValueAt is not supported");
}
uint64_t*
Offsets() {
return offsets_;
}
protected:
uint64_t* offsets_;
@ -86,63 +152,83 @@ class ArrayChunk : public Chunk {
public:
ArrayChunk(int32_t row_nums,
char* data,
size_t size,
milvus::DataType element_type)
: Chunk(row_nums, data, size), element_type_(element_type) {
uint64_t size,
milvus::DataType element_type,
bool nullable)
: Chunk(row_nums, data, size, nullable), element_type_(element_type) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
lens_ = offsets_ + row_nums;
offsets_lens_ =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
ConstructViews();
}
SpanBase
Span() const;
ArrayView
View(int64_t idx) const {
return views_[idx];
}
void
ConstructViews();
const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"ArrayChunk::ValueAt is not supported");
}
private:
milvus::DataType element_type_;
uint64_t* offsets_;
uint64_t* lens_;
uint64_t* offsets_lens_;
std::vector<ArrayView> views_;
};
class SparseFloatVectorChunk : public Chunk {
public:
SparseFloatVectorChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
SparseFloatVectorChunk(int32_t row_nums,
char* data,
uint64_t size,
bool nullable)
: Chunk(row_nums, data, size, nullable) {
vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) {
int vec_size = 0;
if (i == row_nums - 1) {
vec_size = size - offsets_ptr[i];
} else {
vec_size = offsets_ptr[i + 1] - offsets_ptr[i];
}
vec_[i] = {
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
vec_[i] = {(offsets_ptr[i + 1] - offsets_ptr[i]) /
knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
dim_ = std::max(dim_, vec_[i].dim());
}
}
const char*
Data() const {
Data() const override {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}
const char*
ValueAt(int64_t i) const override {
return static_cast<const char*>(
static_cast<const void*>(vec_.data() + i));
}
// only for test
std::vector<knowhere::sparse::SparseRow<float>>&
Vec() {
return vec_;
}
int64_t
Dim() {
return dim_;
}
private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
};
} // namespace milvus

View File

@ -10,10 +10,13 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <common/ChunkTarget.h>
#include <algorithm>
#include <cstring>
#include "common/EasyAssert.h"
#include <sys/mman.h>
#include <unistd.h>
const auto PAGE_SIZE = sysconf(_SC_PAGE_SIZE);
namespace milvus {
void
MemChunkTarget::write(const void* data, size_t size, bool append) {
@ -42,8 +45,33 @@ MemChunkTarget::tell() {
return size_;
}
void
MmapChunkTarget::flush() {
if (buffer_.pos == 0) {
return;
}
auto n = file_.Write(buffer_.buf, buffer_.pos);
AssertInfo(n != -1, "failed to write data to file");
buffer_.clear();
}
void
MmapChunkTarget::write(const void* data, size_t size, bool append) {
if (buffer_.sufficient(size)) {
buffer_.write(data, size);
size_ += append ? size : 0;
return;
}
flush();
if (buffer_.sufficient(size)) {
buffer_.write(data, size);
size_ += append ? size : 0;
return;
}
auto n = file_.Write(data, size);
AssertInfo(n != -1, "failed to write data to file");
size_ += append ? size : 0;
@ -51,19 +79,35 @@ MmapChunkTarget::write(const void* data, size_t size, bool append) {
void
MmapChunkTarget::skip(size_t size) {
flush();
file_.Seek(size, SEEK_CUR);
size_ += size;
}
void
MmapChunkTarget::seek(size_t offset) {
flush();
file_.Seek(offset_ + offset, SEEK_SET);
}
std::pair<char*, size_t>
MmapChunkTarget::get() {
// Write padding to align with the page size, ensuring the offset_ aligns with the page size.
auto padding_size =
(size_ / PAGE_SIZE + (size_ % PAGE_SIZE != 0)) * PAGE_SIZE - size_;
char padding[padding_size];
memset(padding, 0, sizeof(padding));
write(padding, padding_size);
flush();
auto m = mmap(
nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_);
AssertInfo(m != MAP_FAILED,
"failed to map: {}, map_size={}, offset={}",
strerror(errno),
size_,
offset_);
return {(char*)m, size_};
}

View File

@ -37,9 +37,34 @@ class ChunkTarget {
};
class MmapChunkTarget : public ChunkTarget {
struct Buffer {
char buf[1 << 14];
size_t pos = 0;
bool
sufficient(size_t size) {
return pos + size <= sizeof(buf);
}
void
write(const void* data, size_t size) {
memcpy(buf + pos, data, size);
pos += size;
}
void
clear() {
pos = 0;
}
};
public:
MmapChunkTarget(File& file, size_t offset) : file_(file), offset_(offset) {
}
void
flush();
void
write(const void* data, size_t size, bool append = true) override;
@ -59,17 +84,23 @@ class MmapChunkTarget : public ChunkTarget {
File& file_;
size_t offset_ = 0;
size_t size_ = 0;
Buffer buffer_;
};
class MemChunkTarget : public ChunkTarget {
public:
MemChunkTarget(size_t cap) : cap_(cap) {
data_ = reinterpret_cast<char*>(mmap(nullptr,
cap,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANON,
-1,
0));
auto m = mmap(nullptr,
cap,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANON,
-1,
0);
AssertInfo(m != MAP_FAILED,
"failed to map: {}, map_size={}",
strerror(errno),
size_);
data_ = reinterpret_cast<char*>(m);
}
void

View File

@ -44,7 +44,7 @@ StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_ + MMAP_STRING_PADDING;
size += sizeof(uint64_t) * (row_nums_ + 1) + MMAP_STRING_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -63,11 +63,19 @@ StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
}
// write data
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;
for (auto str : strs) {
offsets.push_back(offset_start_pos);
offset_start_pos += str.size();
}
offsets.push_back(offset_start_pos);
target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));
for (auto str : strs) {
offsets_.push_back(target_->tell());
target_->write(str.data(), str.size());
}
}
@ -78,12 +86,8 @@ StringChunkWriter::finish() {
// FIXME
char padding[MMAP_STRING_PADDING];
target_->write(padding, MMAP_STRING_PADDING);
// seek back to write offsets
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<StringChunk>(row_nums_, data, size);
return std::make_shared<StringChunk>(row_nums_, data, size, nullable_);
}
void
@ -101,14 +105,14 @@ JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
size += json.data().size();
jsons.push_back(std::move(json));
}
AssertInfo(data->length() % 8 == 0,
"String length should be multiple of 8");
// AssertInfo(data->length() % 8 == 0,
// "String length should be multiple of 8");
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_ + simdjson::SIMDJSON_PADDING;
size += sizeof(uint64_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -126,12 +130,20 @@ JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
}
}
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;
for (auto json : jsons) {
offsets.push_back(offset_start_pos);
offset_start_pos += json.data().size();
}
offsets.push_back(offset_start_pos);
target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));
// write data
for (auto json : jsons) {
offsets_.push_back(target_->tell());
target_->write(json.data().data(), json.data().size());
}
}
@ -141,17 +153,15 @@ JSONChunkWriter::finish() {
char padding[simdjson::SIMDJSON_PADDING];
target_->write(padding, simdjson::SIMDJSON_PADDING);
// write offsets and padding
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<JSONChunk>(row_nums_, data, size);
return std::make_shared<JSONChunk>(row_nums_, data, size, nullable_);
}
void
ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
auto is_string = IsStringDataType(element_type_);
std::vector<Array> arrays;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
@ -164,8 +174,10 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto arr = Array(scalar_array);
size += arr.byte_size();
arrays.push_back(std::move(arr));
// element offsets size
size += sizeof(uint64_t) * arr.length();
if (is_string) {
// element offsets size
size += sizeof(uint64_t) * arr.length();
}
}
row_nums_ += array->length();
auto null_bitmap_n = (data->length() + 7) / 8;
@ -173,10 +185,8 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
size += null_bitmap_n;
}
auto is_string = IsStringDataType(element_type_);
// offsets + lens
size += is_string ? sizeof(uint64_t) * row_nums_ * 2 + MMAP_ARRAY_PADDING
: sizeof(uint64_t) * row_nums_ + MMAP_ARRAY_PADDING;
size += sizeof(uint64_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -193,16 +203,35 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
}
}
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_ * 2);
int offsets_num = row_nums_ + 1;
int len_num = row_nums_;
int offset_start_pos =
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
std::vector<uint64_t> offsets;
std::vector<uint64_t> lens;
for (auto& arr : arrays) {
offsets.push_back(offset_start_pos);
lens.push_back(arr.length());
offset_start_pos +=
is_string ? sizeof(uint64_t) * arr.get_offsets().size() : 0;
offset_start_pos += arr.byte_size();
}
offsets.push_back(offset_start_pos);
for (int i = 0; i < offsets.size(); i++) {
if (i == offsets.size() - 1) {
target_->write(&offsets[i], sizeof(uint64_t));
break;
}
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&lens[i], sizeof(uint64_t));
}
for (auto& arr : arrays) {
// write elements offsets
offsets_.push_back(target_->tell());
if (is_string) {
target_->write(arr.get_offsets().data(),
arr.get_offsets().size() * sizeof(uint64_t));
}
lens_.push_back(arr.length());
target_->write(arr.data(), arr.byte_size());
}
}
@ -212,14 +241,9 @@ ArrayChunkWriter::finish() {
char padding[MMAP_ARRAY_PADDING];
target_->write(padding, MMAP_ARRAY_PADDING);
// write offsets and lens
target_->seek(offsets_pos_);
for (size_t i = 0; i < offsets_.size(); i++) {
target_->write(&offsets_[i], sizeof(uint64_t));
target_->write(&lens_[i], sizeof(uint64_t));
}
auto [data, size] = target_->get();
return std::make_shared<ArrayChunk>(row_nums_, data, size, element_type_);
return std::make_shared<ArrayChunk>(
row_nums_, data, size, element_type_, nullable_);
}
void
@ -241,7 +265,7 @@ SparseFloatVectorChunkWriter::write(
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_;
size += sizeof(uint64_t) * (row_nums_ + 1);
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -260,22 +284,29 @@ SparseFloatVectorChunkWriter::write(
}
// write data
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;
for (auto str : strs) {
offsets.push_back(offset_start_pos);
offset_start_pos += str.size();
}
offsets.push_back(offset_start_pos);
target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));
for (auto str : strs) {
offsets_.push_back(target_->tell());
target_->write(str.data(), str.size());
}
}
std::shared_ptr<Chunk>
SparseFloatVectorChunkWriter::finish() {
// seek back to write offsets
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<SparseFloatVectorChunk>(row_nums_, data, size);
return std::make_shared<SparseFloatVectorChunk>(
row_nums_, data, size, nullable_);
}
std::shared_ptr<Chunk>
@ -283,72 +314,180 @@ create_chunk(const FieldMeta& field_meta,
int dim,
std::shared_ptr<arrow::RecordBatchReader> r) {
std::shared_ptr<ChunkWriterBase> w;
bool nullable = field_meta.is_nullable();
switch (field_meta.get_data_type()) {
case milvus::DataType::BOOL: {
w = std::make_shared<ChunkWriter<arrow::BooleanArray, bool>>(dim);
w = std::make_shared<ChunkWriter<arrow::BooleanArray, bool>>(
dim, nullable);
break;
}
case milvus::DataType::INT8: {
w = std::make_shared<ChunkWriter<arrow::Int8Array, int8_t>>(dim);
w = std::make_shared<ChunkWriter<arrow::Int8Array, int8_t>>(
dim, nullable);
break;
}
case milvus::DataType::INT16: {
w = std::make_shared<ChunkWriter<arrow::Int16Array, int16_t>>(dim);
w = std::make_shared<ChunkWriter<arrow::Int16Array, int16_t>>(
dim, nullable);
break;
}
case milvus::DataType::INT32: {
w = std::make_shared<ChunkWriter<arrow::Int32Array, int32_t>>(dim);
w = std::make_shared<ChunkWriter<arrow::Int32Array, int32_t>>(
dim, nullable);
break;
}
case milvus::DataType::INT64: {
w = std::make_shared<ChunkWriter<arrow::Int64Array, int64_t>>(dim);
w = std::make_shared<ChunkWriter<arrow::Int64Array, int64_t>>(
dim, nullable);
break;
}
case milvus::DataType::FLOAT: {
w = std::make_shared<ChunkWriter<arrow::FloatArray, float>>(dim);
w = std::make_shared<ChunkWriter<arrow::FloatArray, float>>(
dim, nullable);
break;
}
case milvus::DataType::DOUBLE: {
w = std::make_shared<ChunkWriter<arrow::DoubleArray, double>>(dim);
w = std::make_shared<ChunkWriter<arrow::DoubleArray, double>>(
dim, nullable);
break;
}
case milvus::DataType::VECTOR_FLOAT: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, float>>(dim);
ChunkWriter<arrow::FixedSizeBinaryArray, float>>(dim, nullable);
break;
}
case milvus::DataType::VECTOR_BINARY: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, uint8_t>>(dim / 8);
ChunkWriter<arrow::FixedSizeBinaryArray, uint8_t>>(dim / 8,
nullable);
break;
}
case milvus::DataType::VECTOR_FLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(dim);
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(
dim, nullable);
break;
}
case milvus::DataType::VECTOR_BFLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(dim);
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(
dim, nullable);
break;
}
case milvus::DataType::VARCHAR:
case milvus::DataType::STRING: {
w = std::make_shared<StringChunkWriter>();
w = std::make_shared<StringChunkWriter>(nullable);
break;
}
case milvus::DataType::JSON: {
w = std::make_shared<JSONChunkWriter>();
w = std::make_shared<JSONChunkWriter>(nullable);
break;
}
case milvus::DataType::ARRAY: {
w = std::make_shared<ArrayChunkWriter>(
field_meta.get_element_type());
field_meta.get_element_type(), nullable);
break;
}
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
w = std::make_shared<SparseFloatVectorChunkWriter>();
w = std::make_shared<SparseFloatVectorChunkWriter>(nullable);
break;
}
default:
PanicInfo(Unsupported, "Unsupported data type");
}
w->write(r);
return w->finish();
}
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
File& file,
size_t file_offset,
std::shared_ptr<arrow::RecordBatchReader> r) {
std::shared_ptr<ChunkWriterBase> w;
bool nullable = field_meta.is_nullable();
switch (field_meta.get_data_type()) {
case milvus::DataType::BOOL: {
w = std::make_shared<ChunkWriter<arrow::BooleanArray, bool>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::INT8: {
w = std::make_shared<ChunkWriter<arrow::Int8Array, int8_t>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::INT16: {
w = std::make_shared<ChunkWriter<arrow::Int16Array, int16_t>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::INT32: {
w = std::make_shared<ChunkWriter<arrow::Int32Array, int32_t>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::INT64: {
w = std::make_shared<ChunkWriter<arrow::Int64Array, int64_t>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::FLOAT: {
w = std::make_shared<ChunkWriter<arrow::FloatArray, float>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::DOUBLE: {
w = std::make_shared<ChunkWriter<arrow::DoubleArray, double>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::VECTOR_FLOAT: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, float>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::VECTOR_BINARY: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, uint8_t>>(
dim / 8, file, file_offset, nullable);
break;
}
case milvus::DataType::VECTOR_FLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::VECTOR_BFLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(
dim, file, file_offset, nullable);
break;
}
case milvus::DataType::VARCHAR:
case milvus::DataType::STRING: {
w = std::make_shared<StringChunkWriter>(
file, file_offset, nullable);
break;
}
case milvus::DataType::JSON: {
w = std::make_shared<JSONChunkWriter>(file, file_offset, nullable);
break;
}
case milvus::DataType::ARRAY: {
w = std::make_shared<ArrayChunkWriter>(
field_meta.get_element_type(), file, file_offset, nullable);
break;
}
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
w = std::make_shared<SparseFloatVectorChunkWriter>(
file, file_offset, nullable);
break;
}
default:

View File

@ -25,10 +25,11 @@ namespace milvus {
class ChunkWriterBase {
public:
ChunkWriterBase() = default;
explicit ChunkWriterBase(bool nullable) : nullable_(nullable) {
}
ChunkWriterBase(File& file, size_t offset)
: file_(&file), file_offset_(offset) {
ChunkWriterBase(File& file, size_t offset, bool nullable)
: file_(&file), file_offset_(offset), nullable_(nullable) {
}
virtual void
@ -46,17 +47,18 @@ class ChunkWriterBase {
int row_nums_ = 0;
File* file_ = nullptr;
size_t file_offset_ = 0;
bool nullable_ = false;
std::shared_ptr<ChunkTarget> target_;
};
template <typename ArrowType, typename T>
class ChunkWriter : public ChunkWriterBase {
public:
ChunkWriter(int dim) : dim_(dim) {
ChunkWriter(int dim, bool nullable) : ChunkWriterBase(nullable), dim_(dim) {
}
ChunkWriter(int dim, File& file, size_t offset)
: ChunkWriterBase(file, offset), dim_(dim){};
ChunkWriter(int dim, File& file, size_t offset, bool nullable)
: ChunkWriterBase(file, offset, nullable), dim_(dim){};
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override {
@ -104,8 +106,8 @@ class ChunkWriter : public ChunkWriterBase {
std::shared_ptr<Chunk>
finish() override {
auto [data, size] = target_->get();
return std::make_shared<FixedWidthChunk<T>>(
row_nums_, dim_, data, size);
return std::make_shared<FixedWidthChunk>(
row_nums_, dim_, data, size, sizeof(T), nullable_);
}
private:
@ -165,10 +167,6 @@ class StringChunkWriter : public ChunkWriterBase {
std::shared_ptr<Chunk>
finish() override;
protected:
std::vector<int64_t> offsets_;
size_t offsets_pos_ = 0;
};
class JSONChunkWriter : public ChunkWriterBase {
@ -180,21 +178,18 @@ class JSONChunkWriter : public ChunkWriterBase {
std::shared_ptr<Chunk>
finish() override;
private:
std::vector<int64_t> offsets_;
size_t offsets_pos_ = 0;
};
class ArrayChunkWriter : public ChunkWriterBase {
public:
ArrayChunkWriter(const milvus::DataType element_type)
: element_type_(element_type) {
ArrayChunkWriter(const milvus::DataType element_type, bool nullable)
: ChunkWriterBase(nullable), element_type_(element_type) {
}
ArrayChunkWriter(const milvus::DataType element_type,
File& file,
size_t offset)
: ChunkWriterBase(file, offset), element_type_(element_type) {
size_t offset,
bool nullable)
: ChunkWriterBase(file, offset, nullable), element_type_(element_type) {
}
void
@ -205,9 +200,6 @@ class ArrayChunkWriter : public ChunkWriterBase {
private:
const milvus::DataType element_type_;
std::vector<uint64_t> offsets_;
std::vector<uint64_t> lens_;
size_t offsets_pos_;
};
class SparseFloatVectorChunkWriter : public ChunkWriterBase {
@ -219,10 +211,6 @@ class SparseFloatVectorChunkWriter : public ChunkWriterBase {
std::shared_ptr<Chunk>
finish() override;
private:
uint64_t offsets_pos_ = 0;
std::vector<uint64_t> offsets_;
};
std::shared_ptr<Chunk>

View File

@ -17,6 +17,8 @@
#pragma once
#include <iostream>
#include <utility>
#include <variant>
#include "common/Consts.h"
namespace milvus {
@ -47,11 +49,14 @@ void
SetDefaultExecEvalExprBatchSize(int64_t val);
struct BufferView {
char* data_;
size_t size_;
struct Element {
const char* data_;
uint64_t* offsets_;
int start_;
int end_;
};
BufferView(char* data_ptr, size_t size) : data_(data_ptr), size_(size) {
}
std::variant<std::vector<Element>, std::pair<char*, size_t>> data_;
};
} // namespace milvus

View File

@ -23,6 +23,7 @@
#include "common/FieldDataInterface.h"
#include "common/Channel.h"
#include "parquet/arrow/reader.h"
namespace milvus {
@ -143,6 +144,21 @@ using FieldDataPtr = std::shared_ptr<FieldDataBase>;
using FieldDataChannel = Channel<FieldDataPtr>;
using FieldDataChannelPtr = std::shared_ptr<FieldDataChannel>;
struct ArrowDataWrapper {
ArrowDataWrapper() = default;
ArrowDataWrapper(std::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<parquet::arrow::FileReader> arrow_reader,
std::shared_ptr<uint8_t[]> file_data)
: reader(reader), arrow_reader(arrow_reader), file_data(file_data) {
}
std::shared_ptr<arrow::RecordBatchReader> reader;
// file reader must outlive the record batch reader
std::shared_ptr<parquet::arrow::FileReader> arrow_reader;
// underlying file data memory, must outlive the arrow reader
std::shared_ptr<uint8_t[]> file_data;
};
using ArrowReaderChannel = Channel<std::shared_ptr<milvus::ArrowDataWrapper>>;
FieldDataPtr
InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows);

View File

@ -395,21 +395,6 @@ class FieldDataImpl : public FieldDataBase {
return &data_[offset];
}
// std::optional<const void*>
// Value(ssize_t offset) {
// if (!is_type_entire_row) {
// return RawValue(offset);
// }
// AssertInfo(offset < get_num_rows(),
// "field data subscript out of range");
// AssertInfo(offset < length(),
// "subscript position don't has valid value");
// if (nullable_ && !valid_data_[offset]) {
// return std::nullopt;
// }
// return &field_data_[offset];
// }
int64_t
Size() const override {
return DataSize() + ValidDataSize();

View File

@ -28,6 +28,7 @@ enum SegmentType {
Growing = 1,
Sealed = 2,
Indexing = 3,
ChunkedSealed = 4,
};
typedef enum SegmentType SegmentType;

View File

@ -15,6 +15,7 @@
// limitations under the License.
#include "CompareExpr.h"
#include "common/type_c.h"
#include "query/Relational.h"
namespace milvus {
@ -28,15 +29,248 @@ PhyCompareFilterExpr::IsStringExpr() {
int64_t
PhyCompareFilterExpr::GetNextBatchSize() {
auto current_rows =
segment_->type() == SegmentType::Growing
? current_chunk_id_ * size_per_chunk_ + current_chunk_pos_
: current_chunk_pos_;
auto current_rows = GetCurrentRows();
return current_rows + batch_size_ >= active_count_
? active_count_ - current_rows
: batch_size_;
}
template <typename T>
MultipleChunkDataAccessor
PhyCompareFilterExpr::GetChunkData(FieldId field_id,
bool index,
int64_t& current_chunk_id,
int64_t& current_chunk_pos) {
if (index) {
auto& indexing = const_cast<index::ScalarIndex<T>&>(
segment_->chunk_scalar_index<T>(field_id, current_chunk_id));
auto current_chunk_size = segment_->type() == SegmentType::Growing
? size_per_chunk_
: active_count_;
if (indexing.HasRawData()) {
return [&, current_chunk_size]() -> const number {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
indexing = const_cast<index::ScalarIndex<T>&>(
segment_->chunk_scalar_index<T>(field_id,
current_chunk_id));
}
return indexing.Reverse_Lookup(current_chunk_pos++);
};
}
}
auto chunk_data =
segment_->chunk_data<T>(field_id, current_chunk_id).data();
auto current_chunk_size = segment_->chunk_size(field_id, current_chunk_id);
return
[=, &current_chunk_id, &current_chunk_pos]() mutable -> const number {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
chunk_data =
segment_->chunk_data<T>(field_id, current_chunk_id).data();
current_chunk_size =
segment_->chunk_size(field_id, current_chunk_id);
}
return chunk_data[current_chunk_pos++];
};
}
template <>
MultipleChunkDataAccessor
PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
bool index,
int64_t& current_chunk_id,
int64_t& current_chunk_pos) {
if (index) {
auto& indexing = const_cast<index::ScalarIndex<std::string>&>(
segment_->chunk_scalar_index<std::string>(field_id,
current_chunk_id));
auto current_chunk_size = segment_->type() == SegmentType::Growing
? size_per_chunk_
: active_count_;
if (indexing.HasRawData()) {
return [&, current_chunk_size]() mutable -> const number {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
indexing = const_cast<index::ScalarIndex<std::string>&>(
segment_->chunk_scalar_index<std::string>(
field_id, current_chunk_id));
}
return indexing.Reverse_Lookup(current_chunk_pos++);
};
}
}
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
auto chunk_data =
segment_->chunk_data<std::string>(field_id, current_chunk_id)
.data();
auto current_chunk_size =
segment_->chunk_size(field_id, current_chunk_id);
return [=,
&current_chunk_id,
&current_chunk_pos]() mutable -> const number {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
chunk_data =
segment_
->chunk_data<std::string>(field_id, current_chunk_id)
.data();
current_chunk_size =
segment_->chunk_size(field_id, current_chunk_id);
}
return chunk_data[current_chunk_pos++];
};
} else {
auto chunk_data =
segment_->chunk_view<std::string_view>(field_id, current_chunk_id)
.first.data();
auto current_chunk_size =
segment_->chunk_size(field_id, current_chunk_id);
return [=,
&current_chunk_id,
&current_chunk_pos]() mutable -> const number {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
chunk_data = segment_
->chunk_view<std::string_view>(
field_id, current_chunk_id)
.first.data();
current_chunk_size =
segment_->chunk_size(field_id, current_chunk_id);
}
return std::string(chunk_data[current_chunk_pos++]);
};
}
}
MultipleChunkDataAccessor
PhyCompareFilterExpr::GetChunkData(DataType data_type,
FieldId field_id,
bool index,
int64_t& current_chunk_id,
int64_t& current_chunk_pos) {
switch (data_type) {
case DataType::BOOL:
return GetChunkData<bool>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::INT8:
return GetChunkData<int8_t>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::INT16:
return GetChunkData<int16_t>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::INT32:
return GetChunkData<int32_t>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::INT64:
return GetChunkData<int64_t>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::FLOAT:
return GetChunkData<float>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::DOUBLE:
return GetChunkData<double>(
field_id, index, current_chunk_id, current_chunk_pos);
case DataType::VARCHAR: {
return GetChunkData<std::string>(
field_id, index, current_chunk_id, current_chunk_pos);
}
default:
PanicInfo(DataTypeInvalid, "unsupported data type: {}", data_type);
}
}
template <typename OpType>
VectorPtr
PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op) {
if (segment_->is_chunked()) {
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto res_vec =
std::make_shared<ColumnVector>(TargetBitmap(real_batch_size));
TargetBitmapView res(res_vec->GetRawData(), real_batch_size);
auto left = GetChunkData(expr_->left_data_type_,
expr_->left_field_id_,
is_left_indexed_,
left_current_chunk_id_,
left_current_chunk_pos_);
auto right = GetChunkData(expr_->right_data_type_,
expr_->right_field_id_,
is_right_indexed_,
right_current_chunk_id_,
right_current_chunk_pos_);
for (int i = 0; i < real_batch_size; ++i) {
res[i] = boost::apply_visitor(
milvus::query::Relational<decltype(op)>{}, left(), right());
}
return res_vec;
} else {
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto res_vec =
std::make_shared<ColumnVector>(TargetBitmap(real_batch_size));
TargetBitmapView res(res_vec->GetRawData(), real_batch_size);
auto left_data_barrier =
segment_->num_chunk_data(expr_->left_field_id_);
auto right_data_barrier =
segment_->num_chunk_data(expr_->right_field_id_);
int64_t processed_rows = 0;
for (int64_t chunk_id = current_chunk_id_; chunk_id < num_chunk_;
++chunk_id) {
auto chunk_size = chunk_id == num_chunk_ - 1
? active_count_ - chunk_id * size_per_chunk_
: size_per_chunk_;
auto left = GetChunkData(expr_->left_data_type_,
expr_->left_field_id_,
chunk_id,
left_data_barrier);
auto right = GetChunkData(expr_->right_data_type_,
expr_->right_field_id_,
chunk_id,
right_data_barrier);
for (int i = chunk_id == current_chunk_id_ ? current_chunk_pos_ : 0;
i < chunk_size;
++i) {
res[processed_rows++] = boost::apply_visitor(
milvus::query::Relational<decltype(op)>{},
left(i),
right(i));
if (processed_rows >= batch_size_) {
current_chunk_id_ = chunk_id;
current_chunk_pos_ = i + 1;
return res_vec;
}
}
}
return res_vec;
}
}
template <typename T>
ChunkDataAccessor
PhyCompareFilterExpr::GetChunkData(FieldId field_id,
@ -113,52 +347,6 @@ PhyCompareFilterExpr::GetChunkData(DataType data_type,
}
}
template <typename OpType>
VectorPtr
PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op) {
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto res_vec =
std::make_shared<ColumnVector>(TargetBitmap(real_batch_size));
TargetBitmapView res(res_vec->GetRawData(), real_batch_size);
auto left_data_barrier = segment_->num_chunk_data(expr_->left_field_id_);
auto right_data_barrier = segment_->num_chunk_data(expr_->right_field_id_);
int64_t processed_rows = 0;
for (int64_t chunk_id = current_chunk_id_; chunk_id < num_chunk_;
++chunk_id) {
auto chunk_size = chunk_id == num_chunk_ - 1
? active_count_ - chunk_id * size_per_chunk_
: size_per_chunk_;
auto left = GetChunkData(expr_->left_data_type_,
expr_->left_field_id_,
chunk_id,
left_data_barrier);
auto right = GetChunkData(expr_->right_data_type_,
expr_->right_field_id_,
chunk_id,
right_data_barrier);
for (int i = chunk_id == current_chunk_id_ ? current_chunk_pos_ : 0;
i < chunk_size;
++i) {
res[processed_rows++] = boost::apply_visitor(
milvus::query::Relational<decltype(op)>{}, left(i), right(i));
if (processed_rows >= batch_size_) {
current_chunk_id_ = chunk_id;
current_chunk_pos_ = i + 1;
return res_vec;
}
}
}
return res_vec;
}
void
PhyCompareFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
// For segment both fields has no index, can use SIMD to speed up.

View File

@ -22,6 +22,7 @@
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "common/Vector.h"
#include "common/type_c.h"
#include "exec/expression/Expr.h"
#include "segcore/SegmentInterface.h"
@ -37,6 +38,7 @@ using number = boost::variant<bool,
double,
std::string>;
using ChunkDataAccessor = std::function<const number(int)>;
using MultipleChunkDataAccessor = std::function<const number()>;
template <typename T, typename U, proto::plan::OpType op>
struct CompareElementFunc {
@ -114,9 +116,26 @@ class PhyCompareFilterExpr : public Expr {
is_left_indexed_ = segment_->HasIndex(left_field_);
is_right_indexed_ = segment_->HasIndex(right_field_);
size_per_chunk_ = segment_->size_per_chunk();
num_chunk_ = is_left_indexed_
? segment_->num_chunk_index(expr_->left_field_id_)
: upper_div(active_count_, size_per_chunk_);
if (segment_->is_chunked()) {
left_num_chunk_ =
is_left_indexed_
? segment_->num_chunk_index(expr_->left_field_id_)
: segment_->type() == SegmentType::Growing
? upper_div(active_count_, size_per_chunk_)
: segment_->num_chunk_data(left_field_);
right_num_chunk_ =
is_right_indexed_
? segment_->num_chunk_index(expr_->right_field_id_)
: segment_->type() == SegmentType::Growing
? upper_div(active_count_, size_per_chunk_)
: segment_->num_chunk_data(right_field_);
num_chunk_ = left_num_chunk_;
} else {
num_chunk_ = is_left_indexed_
? segment_->num_chunk_index(expr_->left_field_id_)
: upper_div(active_count_, size_per_chunk_);
}
AssertInfo(
batch_size_ > 0,
fmt::format("expr batch size should greater than zero, but now: {}",
@ -128,6 +147,67 @@ class PhyCompareFilterExpr : public Expr {
void
MoveCursor() override {
if (segment_->is_chunked()) {
MoveCursorForMultipleChunk();
} else {
MoveCursorForSingleChunk();
}
}
void
MoveCursorForMultipleChunk() {
int64_t processed_rows = 0;
for (int64_t chunk_id = left_current_chunk_id_;
chunk_id < left_num_chunk_;
++chunk_id) {
auto chunk_size = 0;
if (segment_->type() == SegmentType::Growing) {
chunk_size = chunk_id == left_num_chunk_ - 1
? active_count_ - chunk_id * size_per_chunk_
: size_per_chunk_;
} else {
chunk_size = segment_->chunk_size(left_field_, chunk_id);
}
for (int i = chunk_id == left_current_chunk_id_
? left_current_chunk_pos_
: 0;
i < chunk_size;
++i) {
if (++processed_rows >= batch_size_) {
left_current_chunk_id_ = chunk_id;
left_current_chunk_pos_ = i + 1;
}
}
}
processed_rows = 0;
for (int64_t chunk_id = right_current_chunk_id_;
chunk_id < right_num_chunk_;
++chunk_id) {
auto chunk_size = 0;
if (segment_->type() == SegmentType::Growing) {
chunk_size = chunk_id == right_num_chunk_ - 1
? active_count_ - chunk_id * size_per_chunk_
: size_per_chunk_;
} else {
chunk_size = segment_->chunk_size(right_field_, chunk_id);
}
for (int i = chunk_id == right_current_chunk_id_
? right_current_chunk_pos_
: 0;
i < chunk_size;
++i) {
if (++processed_rows >= batch_size_) {
right_current_chunk_id_ = chunk_id;
right_current_chunk_pos_ = i + 1;
}
}
}
}
void
MoveCursorForSingleChunk() {
int64_t processed_rows = 0;
for (int64_t chunk_id = current_chunk_id_; chunk_id < num_chunk_;
++chunk_id) {
@ -146,6 +226,24 @@ class PhyCompareFilterExpr : public Expr {
}
}
int64_t
GetCurrentRows() {
if (segment_->is_chunked()) {
auto current_rows =
is_left_indexed_ && segment_->type() == SegmentType::Sealed
? left_current_chunk_pos_
: segment_->num_rows_until_chunk(left_field_,
left_current_chunk_id_) +
left_current_chunk_pos_;
return current_rows;
} else {
return segment_->type() == SegmentType::Growing
? current_chunk_id_ * size_per_chunk_ +
current_chunk_pos_
: current_chunk_pos_;
}
}
private:
int64_t
GetNextBatchSize();
@ -153,6 +251,13 @@ class PhyCompareFilterExpr : public Expr {
bool
IsStringExpr();
template <typename T>
MultipleChunkDataAccessor
GetChunkData(FieldId field_id,
bool index,
int64_t& current_chunk_id,
int64_t& current_chunk_pos);
template <typename T>
ChunkDataAccessor
GetChunkData(FieldId field_id, int chunk_id, int data_barrier);
@ -160,6 +265,23 @@ class PhyCompareFilterExpr : public Expr {
template <typename T, typename U, typename FUNC, typename... ValTypes>
int64_t
ProcessBothDataChunks(FUNC func, TargetBitmapView res, ValTypes... values) {
if (segment_->is_chunked()) {
return ProcessBothDataChunksForMultipleChunk<T,
U,
FUNC,
ValTypes...>(
func, res, values...);
} else {
return ProcessBothDataChunksForSingleChunk<T, U, FUNC, ValTypes...>(
func, res, values...);
}
}
template <typename T, typename U, typename FUNC, typename... ValTypes>
int64_t
ProcessBothDataChunksForSingleChunk(FUNC func,
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;
for (size_t i = current_chunk_id_; i < num_chunk_; i++) {
@ -194,6 +316,56 @@ class PhyCompareFilterExpr : public Expr {
return processed_size;
}
template <typename T, typename U, typename FUNC, typename... ValTypes>
int64_t
ProcessBothDataChunksForMultipleChunk(FUNC func,
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;
// only call this function when left and right are not indexed, so they have the same number of chunks
for (size_t i = left_current_chunk_id_; i < left_num_chunk_; i++) {
auto left_chunk = segment_->chunk_data<T>(left_field_, i);
auto right_chunk = segment_->chunk_data<U>(right_field_, i);
auto data_pos =
(i == left_current_chunk_id_) ? left_current_chunk_pos_ : 0;
auto size = 0;
if (segment_->type() == SegmentType::Growing) {
size = (i == (left_num_chunk_ - 1))
? (active_count_ % size_per_chunk_ == 0
? size_per_chunk_ - data_pos
: active_count_ % size_per_chunk_ - data_pos)
: size_per_chunk_ - data_pos;
} else {
size = segment_->chunk_size(left_field_, i) - data_pos;
}
if (processed_size + size >= batch_size_) {
size = batch_size_ - processed_size;
}
const T* left_data = left_chunk.data() + data_pos;
const U* right_data = right_chunk.data() + data_pos;
func(left_data, right_data, size, res + processed_size, values...);
processed_size += size;
if (processed_size >= batch_size_) {
left_current_chunk_id_ = i;
left_current_chunk_pos_ = data_pos + size;
break;
}
}
return processed_size;
}
MultipleChunkDataAccessor
GetChunkData(DataType data_type,
FieldId field_id,
bool index,
int64_t& current_chunk_id,
int64_t& current_chunk_pos);
ChunkDataAccessor
GetChunkData(DataType data_type,
FieldId field_id,
@ -225,6 +397,12 @@ class PhyCompareFilterExpr : public Expr {
bool is_right_indexed_;
int64_t active_count_{0};
int64_t num_chunk_{0};
int64_t left_num_chunk_{0};
int64_t right_num_chunk_{0};
int64_t left_current_chunk_id_{0};
int64_t left_current_chunk_pos_{0};
int64_t right_current_chunk_id_{0};
int64_t right_current_chunk_pos_{0};
int64_t current_chunk_id_{0};
int64_t current_chunk_pos_{0};
int64_t size_per_chunk_{0};

View File

@ -122,12 +122,43 @@ class SegmentExpr : public Expr {
}
// if index not include raw data, also need load data
if (segment_->HasFieldData(field_id_)) {
num_data_chunk_ = upper_div(active_count_, size_per_chunk_);
if (segment_->is_chunked()) {
num_data_chunk_ = segment_->num_chunk_data(field_id_);
} else {
num_data_chunk_ = upper_div(active_count_, size_per_chunk_);
}
}
}
void
MoveCursorForData() {
MoveCursorForDataMultipleChunk() {
int64_t processed_size = 0;
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
auto data_pos =
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
int64_t size = 0;
if (segment_->type() == SegmentType::Growing) {
size = (i == (num_data_chunk_ - 1) &&
active_count_ % size_per_chunk_ != 0)
? active_count_ % size_per_chunk_ - data_pos
: size_per_chunk_ - data_pos;
} else {
size = segment_->chunk_size(field_id_, i) - data_pos;
}
size = std::min(size, batch_size_ - processed_size);
processed_size += size;
if (processed_size >= batch_size_) {
current_data_chunk_ = i;
current_data_chunk_pos_ = data_pos + size;
break;
}
// }
}
}
void
MoveCursorForDataSingleChunk() {
if (segment_->type() == SegmentType::Sealed) {
auto size =
std::min(active_count_ - current_data_chunk_pos_, batch_size_);
@ -154,6 +185,15 @@ class SegmentExpr : public Expr {
}
}
void
MoveCursorForData() {
if (segment_->is_chunked()) {
MoveCursorForDataMultipleChunk();
} else {
MoveCursorForDataSingleChunk();
}
}
void
MoveCursorForIndex() {
AssertInfo(segment_->type() == SegmentType::Sealed,
@ -183,7 +223,17 @@ class SegmentExpr : public Expr {
auto current_chunk_pos = is_index_mode_ && use_index_
? current_index_chunk_pos_
: current_data_chunk_pos_;
auto current_rows = current_chunk * size_per_chunk_ + current_chunk_pos;
auto current_rows = 0;
if (segment_->is_chunked()) {
current_rows =
is_index_mode_ && use_index_ &&
segment_->type() == SegmentType::Sealed
? current_chunk_pos
: segment_->num_rows_until_chunk(field_id_, current_chunk) +
current_chunk_pos;
} else {
current_rows = current_chunk * size_per_chunk_ + current_chunk_pos;
}
return current_rows + batch_size_ >= active_count_
? active_count_ - current_rows
: batch_size_;
@ -220,7 +270,7 @@ class SegmentExpr : public Expr {
template <typename T, typename FUNC, typename... ValTypes>
int64_t
ProcessDataChunks(
ProcessDataChunksForSingleChunk(
FUNC func,
std::function<bool(const milvus::SkipIndex&, FieldId, int)> skip_func,
TargetBitmapView res,
@ -266,6 +316,90 @@ class SegmentExpr : public Expr {
return processed_size;
}
template <typename T, typename FUNC, typename... ValTypes>
int64_t
ProcessDataChunksForMultipleChunk(
FUNC func,
std::function<bool(const milvus::SkipIndex&, FieldId, int)> skip_func,
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;
// if constexpr (std::is_same_v<T, std::string_view> ||
// std::is_same_v<T, Json>) {
// if (segment_->type() == SegmentType::Sealed) {
// return ProcessChunkForSealedSeg<T>(
// func, skip_func, res, values...);
// }
// }
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
auto data_pos =
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
int64_t size = 0;
if (segment_->type() == SegmentType::Growing) {
size = (i == (num_data_chunk_ - 1))
? (active_count_ % size_per_chunk_ == 0
? size_per_chunk_ - data_pos
: active_count_ % size_per_chunk_ - data_pos)
: size_per_chunk_ - data_pos;
} else {
size = segment_->chunk_size(field_id_, i) - data_pos;
}
size = std::min(size, batch_size_ - processed_size);
auto& skip_index = segment_->GetSkipIndex();
if (!skip_func || !skip_func(skip_index, field_id_, i)) {
bool is_seal = false;
if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) {
if (segment_->type() == SegmentType::Sealed) {
auto data_vec = segment_
->get_batch_views<T>(
field_id_, i, data_pos, size)
.first;
func(data_vec.data(),
size,
res + processed_size,
values...);
is_seal = true;
}
}
if (!is_seal) {
auto chunk = segment_->chunk_data<T>(field_id_, i);
const T* data = chunk.data() + data_pos;
func(data, size, res + processed_size, values...);
}
}
processed_size += size;
if (processed_size >= batch_size_) {
current_data_chunk_ = i;
current_data_chunk_pos_ = data_pos + size;
break;
}
}
return processed_size;
}
template <typename T, typename FUNC, typename... ValTypes>
int64_t
ProcessDataChunks(
FUNC func,
std::function<bool(const milvus::SkipIndex&, FieldId, int)> skip_func,
TargetBitmapView res,
ValTypes... values) {
if (segment_->is_chunked()) {
return ProcessDataChunksForMultipleChunk<T>(
func, skip_func, res, values...);
} else {
return ProcessDataChunksForSingleChunk<T>(
func, skip_func, res, values...);
}
}
int
ProcessIndexOneChunk(TargetBitmap& result,

View File

@ -363,15 +363,27 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {
}
// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(field_id_,
chunk_idx);
return chunk.data() + chunk_offset;
};
std::function<const milvus::ArrayView*(int64_t)> retrieve;
if (segment_->is_chunked()) {
retrieve = [this](int64_t offset) -> const milvus::ArrayView* {
auto [chunk_idx, chunk_offset] =
segment_->get_chunk_by_offset(field_id_, offset);
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(
field_id_, chunk_idx);
return chunk.data() + chunk_offset;
};
} else {
auto size_per_chunk = segment_->size_per_chunk();
retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(
field_id_, chunk_idx);
return chunk.data() + chunk_offset;
};
}
// compare the array via the raw data.
auto filter = [&retrieve, &val, reverse](size_t offset) -> bool {

View File

@ -62,8 +62,7 @@ class SealedDataGetter : public DataGetter<T> {
const index::ScalarIndex<T>* field_index_;
public:
SealedDataGetter(const segcore::SegmentSealedImpl& segment,
FieldId& field_id) {
SealedDataGetter(const segcore::SegmentSealed& segment, FieldId& field_id) {
if (segment.HasFieldData(field_id)) {
if constexpr (std::is_same_v<T, std::string>) {
str_field_data_ =
@ -114,8 +113,8 @@ GetDataGetter(const segcore::SegmentInternalInterface& segment,
dynamic_cast<const segcore::SegmentGrowingImpl*>(&segment)) {
return std::make_shared<GrowingDataGetter<T>>(*growing_segment,
fieldId);
} else if (const segcore::SegmentSealedImpl* sealed_segment =
dynamic_cast<const segcore::SegmentSealedImpl*>(&segment)) {
} else if (const segcore::SegmentSealed* sealed_segment =
dynamic_cast<const segcore::SegmentSealed*>(&segment)) {
return std::make_shared<SealedDataGetter<T>>(*sealed_segment, fieldId);
} else {
PanicInfo(UnexpectedError,

View File

@ -111,29 +111,4 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
fieldChunkMetrics_[field_id].emplace(chunk_id, std::move(chunkMetrics));
}
void
SkipIndex::LoadString(milvus::FieldId field_id,
int64_t chunk_id,
const milvus::VariableColumn<std::string>& var_column) {
int num_rows = var_column.NumRows();
auto chunkMetrics = std::make_unique<FieldChunkMetrics>();
if (num_rows > 0) {
auto info = ProcessStringFieldMetrics(var_column);
chunkMetrics->min_ = Metrics(std::move(info.min_));
chunkMetrics->max_ = Metrics(std::move(info.max_));
chunkMetrics->null_count_ = info.null_count_;
}
chunkMetrics->hasValue_ =
chunkMetrics->null_count_ == num_rows ? false : true;
std::unique_lock lck(mutex_);
if (fieldChunkMetrics_.count(field_id) == 0) {
fieldChunkMetrics_.insert(std::make_pair(
field_id,
std::unordered_map<int64_t, std::unique_ptr<FieldChunkMetrics>>()));
}
fieldChunkMetrics_[field_id].emplace(chunk_id, std::move(chunkMetrics));
}
} // namespace milvus

View File

@ -16,6 +16,7 @@
#include "common/Types.h"
#include "log/Log.h"
#include "mmap/Column.h"
#include "mmap/ChunkedColumn.h"
namespace milvus {
@ -100,10 +101,32 @@ class SkipIndex {
const bool* valid_data,
int64_t count);
template <typename T>
void
LoadString(milvus::FieldId field_id,
int64_t chunk_id,
const milvus::VariableColumn<std::string>& var_column);
const T& var_column) {
int num_rows = var_column.NumRows();
auto chunkMetrics = std::make_unique<FieldChunkMetrics>();
if (num_rows > 0) {
auto info = ProcessStringFieldMetrics(var_column);
chunkMetrics->min_ = Metrics(info.min_);
chunkMetrics->max_ = Metrics(info.max_);
chunkMetrics->null_count_ = info.null_count_;
}
chunkMetrics->hasValue_ =
chunkMetrics->null_count_ == num_rows ? false : true;
std::unique_lock lck(mutex_);
if (fieldChunkMetrics_.count(field_id) == 0) {
fieldChunkMetrics_.insert(std::make_pair(
field_id,
std::unordered_map<int64_t,
std::unique_ptr<FieldChunkMetrics>>()));
}
fieldChunkMetrics_[field_id].emplace(chunk_id, std::move(chunkMetrics));
}
private:
const FieldChunkMetrics&
@ -269,9 +292,9 @@ class SkipIndex {
return {minValue, maxValue, null_count};
}
template <typename T>
metricInfo<std::string>
ProcessStringFieldMetrics(
const milvus::VariableColumn<std::string>& var_column) {
ProcessStringFieldMetrics(const T& var_column) {
int num_rows = var_column.NumRows();
// find first not null value
int64_t start = 0;

View File

@ -0,0 +1,427 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <folly/io/IOBuf.h>
#include <sys/mman.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <filesystem>
#include <memory>
#include <queue>
#include <string>
#include <vector>
#include <math.h>
#include "common/Array.h"
#include "common/Chunk.h"
#include "common/Common.h"
#include "common/EasyAssert.h"
#include "common/File.h"
#include "common/FieldMeta.h"
#include "common/FieldData.h"
#include "common/Span.h"
#include "fmt/format.h"
#include "log/Log.h"
#include "mmap/Utils.h"
#include "common/FieldData.h"
#include "common/FieldDataInterface.h"
#include "common/Array.h"
#include "knowhere/dataset.h"
#include "monitor/prometheus_client.h"
#include "storage/MmapChunkManager.h"
#include "mmap/Column.h"
namespace milvus {
class ChunkedColumnBase : public ColumnBase {
public:
ChunkedColumnBase() = default;
// memory mode ctor
ChunkedColumnBase(const FieldMeta& field_meta) {
if (field_meta.is_nullable()) {
nullable_ = true;
}
}
virtual ~ChunkedColumnBase(){};
ChunkedColumnBase(ChunkedColumnBase&& column) noexcept
: nullable_(column.nullable_), num_rows_(column.num_rows_) {
column.num_rows_ = 0;
column.nullable_ = false;
}
virtual void
AppendBatch(const FieldDataPtr data) override {
PanicInfo(ErrorCode::Unsupported, "AppendBatch not supported");
}
virtual const char*
Data(int chunk_id) const override {
chunks_[chunk_id]->Data();
}
virtual const char*
ValueAt(int64_t offset) const {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(offset);
return chunks_[chunk_id]->ValueAt(offset_in_chunk);
};
// MmappedData() returns the mmaped address
const char*
MmappedData() const override {
AssertInfo(chunks_.size() == 1,
"only support one chunk, but got {} chunk(s)",
chunks_.size());
return chunks_[0]->Data();
}
bool
IsValid(size_t offset) const {
if (nullable_) {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(offset);
return chunks_[chunk_id]->isValid(offset_in_chunk);
}
return true;
}
bool
IsNullable() const {
return nullable_;
}
size_t
NumRows() const {
return num_rows_;
};
int64_t
num_chunks() const {
return chunks_.size();
}
virtual void
AddChunk(std::shared_ptr<Chunk> chunk) {
num_rows_until_chunk_.push_back(num_rows_);
num_rows_ += chunk->RowNums();
chunks_.push_back(chunk);
}
virtual uint64_t
DataByteSize() const override {
auto size = 0;
for (auto& chunk : chunks_) {
size += chunk->Size();
}
return size;
}
int64_t
chunk_row_nums(int64_t chunk_id) const {
return chunks_[chunk_id]->RowNums();
}
virtual SpanBase
Span(int64_t chunk_id) const = 0;
// used for sequential access for search
virtual BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) {
PanicInfo(ErrorCode::Unsupported,
"GetBatchBuffer only supported for VariableColumn");
}
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews(int64_t chunk_id) const {
PanicInfo(ErrorCode::Unsupported,
"StringViews only supported for VariableColumn");
}
std::pair<size_t, size_t>
GetChunkIDByOffset(int64_t offset) const {
int chunk_id = 0;
for (auto& chunk : chunks_) {
if (offset < chunk->RowNums()) {
break;
}
offset -= chunk->RowNums();
chunk_id++;
}
return {chunk_id, offset};
}
int64_t
GetNumRowsUntilChunk(int64_t chunk_id) const {
return num_rows_until_chunk_[chunk_id];
}
protected:
bool nullable_{false};
size_t num_rows_{0};
std::vector<int64_t> num_rows_until_chunk_;
private:
// void
// UpdateMetricWhenMmap(size_t mmaped_size) {
// UpdateMetricWhenMmap(mapping_type_, mmaped_size);
// }
// void
// UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) {
// if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
// milvus::monitor::internal_mmap_allocated_space_bytes_anon.Observe(
// mapped_size);
// milvus::monitor::internal_mmap_in_used_space_bytes_anon.Increment(
// mapped_size);
// } else {
// milvus::monitor::internal_mmap_allocated_space_bytes_file.Observe(
// mapped_size);
// milvus::monitor::internal_mmap_in_used_space_bytes_file.Increment(
// mapped_size);
// }
// }
// void
// UpdateMetricWhenMunmap(size_t mapped_size) {
// if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
// milvus::monitor::internal_mmap_in_used_space_bytes_anon.Decrement(
// mapped_size);
// } else {
// milvus::monitor::internal_mmap_in_used_space_bytes_file.Decrement(
// mapped_size);
// }
// }
private:
storage::MmapChunkManagerPtr mcm_ = nullptr;
protected:
std::vector<std::shared_ptr<Chunk>> chunks_;
};
class ChunkedColumn : public ChunkedColumnBase {
public:
// memory mode ctor
ChunkedColumn(const FieldMeta& field_meta) : ChunkedColumnBase(field_meta) {
}
ChunkedColumn(ChunkedColumn&& column) noexcept
: ChunkedColumnBase(std::move(column)) {
}
ChunkedColumn(std::vector<std::shared_ptr<Chunk>> chunks) {
for (auto& chunk : chunks) {
AddChunk(chunk);
}
}
~ChunkedColumn() override = default;
virtual SpanBase
Span(int64_t chunk_id) const override {
return std::dynamic_pointer_cast<FixedWidthChunk>(chunks_[chunk_id])
->Span();
}
};
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
class ChunkedSparseFloatColumn : public ChunkedColumnBase {
public:
// memory mode ctor
ChunkedSparseFloatColumn(const FieldMeta& field_meta)
: ChunkedColumnBase(field_meta) {
}
ChunkedSparseFloatColumn(ChunkedSparseFloatColumn&& column) noexcept
: ChunkedColumnBase(std::move(column)),
dim_(column.dim_),
vec_(std::move(column.vec_)) {
}
ChunkedSparseFloatColumn(std::vector<std::shared_ptr<Chunk>> chunks) {
for (auto& chunk : chunks) {
AddChunk(chunk);
}
}
~ChunkedSparseFloatColumn() override = default;
void
AddChunk(std::shared_ptr<Chunk> chunk) override {
num_rows_until_chunk_.push_back(num_rows_);
num_rows_ += chunk->RowNums();
chunks_.push_back(chunk);
dim_ = std::max(
dim_,
std::dynamic_pointer_cast<SparseFloatVectorChunk>(chunk)->Dim());
}
// This is used to advice mmap prefetch, we don't currently support mmap for
// sparse float vector thus not implemented for now.
size_t
DataByteSize() const override {
PanicInfo(ErrorCode::Unsupported,
"ByteSize not supported for sparse float column");
}
SpanBase
Span(int64_t chunk_id) const override {
PanicInfo(ErrorCode::Unsupported,
"Span not supported for sparse float column");
}
int64_t
Dim() const {
return dim_;
}
private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
};
template <typename T>
class ChunkedVariableColumn : public ChunkedColumnBase {
public:
using ViewType =
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
// memory mode ctor
ChunkedVariableColumn(const FieldMeta& field_meta)
: ChunkedColumnBase(field_meta) {
}
ChunkedVariableColumn(std::vector<std::shared_ptr<Chunk>> chunks) {
for (auto& chunk : chunks) {
AddChunk(chunk);
}
}
ChunkedVariableColumn(ChunkedVariableColumn&& column) noexcept
: ChunkedColumnBase(std::move(column)) {
}
~ChunkedVariableColumn() override = default;
SpanBase
Span(int64_t chunk_id) const override {
PanicInfo(ErrorCode::NotImplemented,
"span() interface is not implemented for variable column");
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews(int64_t chunk_id) const override {
return std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
->StringViews();
}
BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) override {
if (start_offset < 0 || start_offset > num_rows_ ||
start_offset + length > num_rows_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");
}
int chunk_num = chunks_.size();
auto [start_chunk_id, start_offset_in_chunk] =
GetChunkIDByOffset(start_offset);
BufferView buffer_view;
std::vector<BufferView::Element> elements;
for (; start_chunk_id < chunk_num && length > 0; ++start_chunk_id) {
int chunk_size = chunks_[start_chunk_id]->RowNums();
int len =
std::min(int64_t(chunk_size - start_offset_in_chunk), length);
elements.push_back(
{chunks_[start_chunk_id]->Data(),
std::dynamic_pointer_cast<StringChunk>(chunks_[start_chunk_id])
->Offsets(),
start_offset_in_chunk,
start_offset_in_chunk + len});
start_offset_in_chunk = 0;
length -= len;
}
buffer_view.data_ = elements;
return buffer_view;
}
ViewType
operator[](const int i) const {
if (i < 0 || i > num_rows_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");
}
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
auto data = chunks_[chunk_id]->Data();
auto offsets = std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
->Offsets();
auto len = offsets[offset_in_chunk + 1] - offsets[offset_in_chunk];
return ViewType(data + offsets[offset_in_chunk], len);
}
std::string_view
RawAt(const int i) const {
return std::string_view((*this)[i]);
}
};
class ChunkedArrayColumn : public ChunkedColumnBase {
public:
// memory mode ctor
ChunkedArrayColumn(const FieldMeta& field_meta)
: ChunkedColumnBase(field_meta) {
}
ChunkedArrayColumn(ChunkedArrayColumn&& column) noexcept
: ChunkedColumnBase(std::move(column)) {
}
ChunkedArrayColumn(std::vector<std::shared_ptr<Chunk>> chunks) {
for (auto& chunk : chunks) {
AddChunk(chunk);
}
}
~ChunkedArrayColumn() override = default;
SpanBase
Span(int64_t chunk_id) const override {
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])->Span();
}
ArrayView
operator[](const int i) const {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->View(offset_in_chunk);
}
ScalarArray
RawAt(const int i) const {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->View(offset_in_chunk)
.output_data();
}
};
} // namespace milvus

View File

@ -24,6 +24,7 @@
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "common/Array.h"
@ -121,13 +122,27 @@ class ColumnBase {
*
*/
public:
enum class MappingType {
virtual size_t
DataByteSize() const = 0;
virtual const char*
MmappedData() const = 0;
virtual void
AppendBatch(const FieldDataPtr data) = 0;
virtual const char*
Data(int chunk_id = 0) const = 0;
};
class SingleChunkColumnBase : public ColumnBase {
public:
enum MappingType {
MAP_WITH_ANONYMOUS = 0,
MAP_WITH_FILE = 1,
MAP_WITH_MANAGER = 2,
};
// MAP_WITH_ANONYMOUS ctor
ColumnBase(size_t reserve_rows, const FieldMeta& field_meta)
SingleChunkColumnBase(size_t reserve_rows, const FieldMeta& field_meta)
: mapping_type_(MappingType::MAP_WITH_ANONYMOUS) {
auto data_type = field_meta.get_data_type();
SetPaddingSize(data_type);
@ -161,11 +176,11 @@ class ColumnBase {
// MAP_WITH_MANAGER ctor
// reserve is number of bytes to allocate(without padding)
ColumnBase(size_t reserve,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor,
bool nullable)
SingleChunkColumnBase(size_t reserve,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor,
bool nullable)
: mcm_(mcm),
mmap_descriptor_(descriptor),
num_rows_(0),
@ -193,7 +208,9 @@ class ColumnBase {
// !!! The incoming file must have padding written at the end of the file.
// Subclasses of variable length data type, if they used this constructor,
// must set num_rows_ by themselves.
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
SingleChunkColumnBase(const File& file,
size_t size,
const FieldMeta& field_meta)
: nullable_(field_meta.is_nullable()),
mapping_type_(MappingType::MAP_WITH_FILE) {
auto data_type = field_meta.get_data_type();
@ -229,7 +246,7 @@ class ColumnBase {
UpdateMetricWhenMmap(size);
}
virtual ~ColumnBase() {
virtual ~SingleChunkColumnBase() {
if (data_ != nullptr) {
size_t mapped_size = data_cap_size_ + padding_;
if (mapping_type_ != MappingType::MAP_WITH_MANAGER) {
@ -246,17 +263,17 @@ class ColumnBase {
}
}
ColumnBase(ColumnBase&&) = delete;
SingleChunkColumnBase(ColumnBase&&) = delete;
// Data() points at an addr that contains the elements
virtual const char*
Data() const {
Data(int chunk_id = 0) const override {
return data_;
}
// MmappedData() returns the mmaped address
const char*
MmappedData() const {
MmappedData() const override {
return data_;
}
@ -481,28 +498,30 @@ class ColumnBase {
storage::MmapChunkManagerPtr mcm_ = nullptr;
};
class Column : public ColumnBase {
class SingleChunkColumn : public SingleChunkColumnBase {
public:
// MAP_WITH_ANONYMOUS ctor
Column(size_t cap, const FieldMeta& field_meta)
: ColumnBase(cap, field_meta) {
SingleChunkColumn(size_t cap, const FieldMeta& field_meta)
: SingleChunkColumnBase(cap, field_meta) {
}
// MAP_WITH_FILE ctor
Column(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
SingleChunkColumn(const File& file,
size_t size,
const FieldMeta& field_meta)
: SingleChunkColumnBase(file, size, field_meta) {
}
// MAP_WITH_MANAGER ctor
Column(size_t reserve,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor,
bool nullable)
: ColumnBase(reserve, data_type, mcm, descriptor, nullable) {
SingleChunkColumn(size_t reserve,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor,
bool nullable)
: SingleChunkColumnBase(reserve, data_type, mcm, descriptor, nullable) {
}
~Column() override = default;
~SingleChunkColumn() override = default;
SpanBase
Span() const override {
@ -511,19 +530,18 @@ class Column : public ColumnBase {
}
};
class SparseFloatColumn : public ColumnBase {
class SingleChunkSparseFloatColumn : public SingleChunkColumnBase {
public:
// MAP_WITH_ANONYMOUS ctor
SparseFloatColumn(const FieldMeta& field_meta)
: ColumnBase(/*reserve_rows= */ 0, field_meta) {
SingleChunkSparseFloatColumn(const FieldMeta& field_meta)
: SingleChunkColumnBase(0, field_meta) {
}
// MAP_WITH_FILE ctor
SparseFloatColumn(const File& file,
size_t size,
const FieldMeta& field_meta,
std::vector<uint64_t>&& indices = {})
: ColumnBase(file, size, field_meta) {
SingleChunkSparseFloatColumn(const File& file,
size_t size,
const FieldMeta& field_meta,
std::vector<uint64_t>&& indices = {})
: SingleChunkColumnBase(file, size, field_meta) {
AssertInfo(!indices.empty(),
"SparseFloatColumn indices should not be empty.");
num_rows_ = indices.size();
@ -545,22 +563,18 @@ class SparseFloatColumn : public ColumnBase {
dim_ = std::max(dim_, vec_.back().dim());
}
}
// MAP_WITH_MANAGER ctor
SparseFloatColumn(storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor)
: ColumnBase(/*reserve= */ 0,
DataType::VECTOR_SPARSE_FLOAT,
mcm,
descriptor,
false) {
SingleChunkSparseFloatColumn(storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptorPtr descriptor)
: SingleChunkColumnBase(
0, DataType::VECTOR_SPARSE_FLOAT, mcm, descriptor, false) {
}
~SparseFloatColumn() override = default;
~SingleChunkSparseFloatColumn() override = default;
// returned pointer points at a list of knowhere::sparse::SparseRow<float>
const char*
Data() const override {
Data(int chunk_id = 0) const override {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}
@ -635,27 +649,29 @@ class SparseFloatColumn : public ColumnBase {
};
template <typename T>
class VariableColumn : public ColumnBase {
class SingleChunkVariableColumn : public SingleChunkColumnBase {
public:
using ViewType =
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
// MAP_WITH_ANONYMOUS ctor
VariableColumn(size_t reserve_rows,
const FieldMeta& field_meta,
size_t block_size)
: ColumnBase(reserve_rows, field_meta), block_size_(block_size) {
SingleChunkVariableColumn(size_t reserve_rows,
const FieldMeta& field_meta,
size_t block_size)
: SingleChunkColumnBase(reserve_rows, field_meta),
block_size_(block_size) {
}
// MAP_WITH_FILE ctor
VariableColumn(const File& file,
size_t size,
const FieldMeta& field_meta,
size_t block_size)
: ColumnBase(file, size, field_meta), block_size_(block_size) {
SingleChunkVariableColumn(const File& file,
size_t size,
const FieldMeta& field_meta,
size_t block_size)
: SingleChunkColumnBase(file, size, field_meta),
block_size_(block_size) {
}
~VariableColumn() override = default;
~SingleChunkVariableColumn() override = default;
SpanBase
Span() const override {
@ -705,7 +721,9 @@ class VariableColumn : public ColumnBase {
pos += sizeof(uint32_t) + size;
}
return BufferView{pos, data_size_ - (pos - data_)};
BufferView res;
res.data_ = std::pair<char*, size_t>{pos, 0};
return res;
}
ViewType
@ -809,21 +827,23 @@ class VariableColumn : public ColumnBase {
std::vector<uint64_t> indices_{};
};
class ArrayColumn : public ColumnBase {
class SingleChunkArrayColumn : public SingleChunkColumnBase {
public:
// MAP_WITH_ANONYMOUS ctor
ArrayColumn(size_t reserve_rows, const FieldMeta& field_meta)
: ColumnBase(reserve_rows, field_meta),
SingleChunkArrayColumn(size_t reserve_rows, const FieldMeta& field_meta)
: SingleChunkColumnBase(reserve_rows, field_meta),
element_type_(field_meta.get_element_type()) {
}
// MAP_WITH_FILE ctor
ArrayColumn(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta),
SingleChunkArrayColumn(const File& file,
size_t size,
const FieldMeta& field_meta)
: SingleChunkColumnBase(file, size, field_meta),
element_type_(field_meta.get_element_type()) {
}
~ArrayColumn() override = default;
~SingleChunkArrayColumn() override = default;
SpanBase
Span() const override {
@ -853,12 +873,13 @@ class ArrayColumn : public ColumnBase {
indices_.emplace_back(data_size_);
element_indices_.emplace_back(array.get_offsets());
if (nullable_) {
return ColumnBase::Append(static_cast<const char*>(array.data()),
valid_data,
array.byte_size());
return SingleChunkColumnBase::Append(
static_cast<const char*>(array.data()),
valid_data,
array.byte_size());
}
ColumnBase::Append(static_cast<const char*>(array.data()),
array.byte_size());
SingleChunkColumnBase::Append(static_cast<const char*>(array.data()),
array.byte_size());
}
void

View File

@ -19,22 +19,30 @@
#include <memory>
#include <string>
#include <vector>
#include "arrow/record_batch.h"
#include "common/FieldData.h"
#include "storage/DataCodec.h"
namespace milvus {
struct FieldDataInfo {
FieldDataInfo() {
channel = std::make_shared<FieldDataChannel>();
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
}
FieldDataInfo(int64_t field_id,
size_t row_count,
std::string mmap_dir_path = "")
std::string mmap_dir_path = "",
bool growing = true)
: field_id(field_id),
row_count(row_count),
mmap_dir_path(std::move(mmap_dir_path)) {
channel = std::make_shared<FieldDataChannel>();
if (growing) {
channel = std::make_shared<FieldDataChannel>();
} else {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
}
}
FieldDataInfo(int64_t field_id,
@ -66,6 +74,18 @@ struct FieldDataInfo {
channel->close();
}
FieldDataInfo(
int64_t field_id,
size_t row_count,
const std::vector<std::shared_ptr<milvus::ArrowDataWrapper>>& batch)
: field_id(field_id), row_count(row_count) {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
for (auto& data : batch) {
arrow_reader_channel->push(data);
}
arrow_reader_channel->close();
}
FieldDataInfo(int64_t field_id,
size_t row_count,
std::string mmap_dir_path,
@ -84,5 +104,6 @@ struct FieldDataInfo {
size_t row_count;
std::string mmap_dir_path;
FieldDataChannelPtr channel;
std::shared_ptr<ArrowReaderChannel> arrow_reader_channel;
};
} // namespace milvus

View File

@ -9,11 +9,15 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <algorithm>
#include <cmath>
#include <string>
#include "bitset/detail/element_wise.h"
#include "common/BitsetView.h"
#include "common/QueryInfo.h"
#include "common/Types.h"
#include "mmap/Column.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
#include "query/helper.h"
@ -73,6 +77,95 @@ SearchOnSealedIndex(const Schema& schema,
search_result.unity_topK_ = topK;
}
void
SearchOnSealed(const Schema& schema,
std::shared_ptr<ChunkedColumnBase> column,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,
int64_t row_count,
const BitsetView& bitview,
SearchResult& result) {
auto field_id = search_info.field_id_;
auto& field = schema[field_id];
// TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder.
auto dim = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT
? 0
: field.get_dim();
query::dataset::SearchDataset dataset{search_info.metric_type_,
num_queries,
search_info.topk_,
search_info.round_decimal_,
dim,
query_data};
auto data_type = field.get_data_type();
CheckBruteForceSearchParam(field, search_info);
auto num_chunk = column->num_chunks();
SubSearchResult final_qr(num_queries,
search_info.topk_,
search_info.metric_type_,
search_info.round_decimal_);
auto offset = 0;
for (int i = 0; i < num_chunk; ++i) {
auto vec_data = column->Data(i);
auto chunk_size = column->chunk_row_nums(i);
const uint8_t* bitset_ptr = nullptr;
bool aligned = false;
if ((offset & 0x7) == 0) {
bitset_ptr = bitview.data() + (offset >> 3);
aligned = true;
} else {
char* bitset_data = new char[(chunk_size + 7) / 8];
std::fill(bitset_data, bitset_data + sizeof(bitset_data), 0);
bitset::detail::ElementWiseBitsetPolicy<char>::op_copy(
reinterpret_cast<const char*>(bitview.data()),
offset,
bitset_data,
0,
chunk_size);
bitset_ptr = reinterpret_cast<const uint8_t*>(bitset_data);
}
offset += chunk_size;
BitsetView bitset_view(bitset_ptr, chunk_size);
if (search_info.group_by_field_id_.has_value()) {
auto sub_qr = BruteForceSearchIterators(dataset,
vec_data,
row_count,
search_info,
bitset_view,
data_type);
final_qr.merge(sub_qr);
} else {
auto sub_qr = BruteForceSearch(dataset,
vec_data,
row_count,
search_info,
bitset_view,
data_type);
final_qr.merge(sub_qr);
}
if (!aligned) {
delete[] bitset_ptr;
}
}
if (search_info.group_by_field_id_.has_value()) {
result.AssembleChunkVectorIterators(
num_queries, 1, -1, final_qr.chunk_iterators());
} else {
result.distances_ = std::move(final_qr.mutable_distances());
result.seg_offsets_ = std::move(final_qr.mutable_seg_offsets());
}
result.unity_topK_ = dataset.topk;
result.total_nq_ = dataset.num_queries;
}
void
SearchOnSealed(const Schema& schema,
const void* vec_data,

View File

@ -27,6 +27,16 @@ SearchOnSealedIndex(const Schema& schema,
const BitsetView& view,
SearchResult& search_result);
void
SearchOnSealed(const Schema& schema,
std::shared_ptr<ChunkedColumnBase> column,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,
int64_t row_count,
const BitsetView& bitset,
SearchResult& result);
void
SearchOnSealed(const Schema& schema,
const void* vec_data,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,392 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_vector.h>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "ConcurrentVector.h"
#include "DeletedRecord.h"
#include "SealedIndexingRecord.h"
#include "SegmentSealed.h"
#include "TimestampIndex.h"
#include "common/EasyAssert.h"
#include "google/protobuf/message_lite.h"
#include "mmap/ChunkedColumn.h"
#include "index/ScalarIndex.h"
#include "sys/mman.h"
#include "common/Types.h"
#include "common/IndexMeta.h"
namespace milvus::segcore {
class ChunkedSegmentSealedImpl : public SegmentSealed {
public:
explicit ChunkedSegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id,
bool TEST_skip_index_for_retrieve = false,
bool is_sorted_by_pk = false);
~ChunkedSegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
void
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
void
LoadSegmentMeta(
const milvus::proto::segcore::LoadSegmentMeta& segment_meta) override;
void
DropIndex(const FieldId field_id) override;
void
DropFieldData(const FieldId field_id) override;
bool
HasIndex(FieldId field_id) const override;
bool
HasFieldData(FieldId field_id) const override;
bool
Contain(const PkType& pk) const override {
return insert_record_.contain(pk);
}
void
LoadFieldData(FieldId field_id, FieldDataInfo& data) override;
void
MapFieldData(const FieldId field_id, FieldDataInfo& data) override;
void
AddFieldDataInfoForSealed(
const LoadFieldDataInfo& field_data_info) override;
int64_t
get_segment_id() const override {
return id_;
}
bool
HasRawData(int64_t field_id) const override;
DataType
GetFieldDataType(FieldId fieldId) const override;
void
RemoveFieldFile(const FieldId field_id) override;
void
CreateTextIndex(FieldId field_id) override;
void
LoadTextIndex(FieldId field_id,
std::unique_ptr<index::TextMatchIndex> index) override;
public:
size_t
GetMemoryUsageInBytes() const override {
return stats_.mem_size.load() + deleted_record_.mem_size();
}
int64_t
get_row_count() const override;
int64_t
get_deleted_count() const override;
const Schema&
get_schema() const override;
std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const;
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;
std::unique_ptr<DataArray>
get_vector(FieldId field_id,
const int64_t* ids,
int64_t count) const override;
bool
is_nullable(FieldId field_id) const override {
auto it = fields_.find(field_id);
AssertInfo(it != fields_.end(),
"Cannot find field with field_id: " +
std::to_string(field_id.get()));
return it->second->IsNullable();
};
bool
is_chunked() const override {
return true;
}
public:
int64_t
num_chunk_index(FieldId field_id) const override;
// count of chunk that has raw data
int64_t
num_chunk_data(FieldId field_id) const override;
int64_t
num_chunk(FieldId field_id) const override;
// return size_per_chunk for each chunk, renaming against confusion
int64_t
size_per_chunk() const override;
int64_t
chunk_size(FieldId field_id, int64_t chunk_id) const override;
std::pair<int64_t, int64_t>
get_chunk_by_offset(FieldId field_id, int64_t offset) const override;
int64_t
num_rows_until_chunk(FieldId field_id, int64_t chunk_id) const override;
std::string
debug() const override;
SegcoreError
Delete(int64_t reserved_offset,
int64_t size,
const IdArray* pks,
const Timestamp* timestamps) override;
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override;
// Calculate: output[i] = Vec[seg_offset[i]]
// where Vec is determined from field_offset
std::unique_ptr<DataArray>
bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const override;
std::unique_ptr<DataArray>
bulk_subscript(
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const override;
bool
is_mmap_field(FieldId id) const override;
void
ClearData();
protected:
// blob and row_count
SpanBase
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
std::pair<BufferView, FixedVector<bool>>
get_chunk_buffer(FieldId field_id,
int64_t chunk_id,
int64_t start_offset,
int64_t length) const override;
const index::IndexBase*
chunk_index_impl(FieldId field_id, int64_t chunk_id) const override;
// Calculate: output[i] = Vec[seg_offset[i]],
// where Vec is determined from field_offset
void
bulk_subscript(SystemFieldType system_type,
const int64_t* seg_offsets,
int64_t count,
void* output) const override;
void
check_search(const query::Plan* plan) const override;
int64_t
get_active_count(Timestamp ts) const override;
const ConcurrentVector<Timestamp>&
get_timestamps() const override {
return insert_record_.timestamps_;
}
private:
template <typename S, typename T = S>
static void
bulk_subscript_impl(const void* src_raw,
const int64_t* seg_offsets,
int64_t count,
T* dst_raw);
template <typename S, typename T = S>
static void
bulk_subscript_impl(const ChunkedColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
T* dst_raw);
template <typename S, typename T = S>
static void
bulk_subscript_impl(const ChunkedColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw);
template <typename S, typename T = S>
static void
bulk_subscript_ptr_impl(const ChunkedColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst_raw);
template <typename T>
static void
bulk_subscript_array_impl(const ChunkedColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst);
static void
bulk_subscript_impl(int64_t element_sizeof,
const ChunkedColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw);
std::unique_ptr<DataArray>
fill_with_empty(FieldId field_id, int64_t count) const;
std::unique_ptr<DataArray>
get_raw_data(FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const;
void
update_row_count(int64_t row_count) {
// if (row_count_opt_.has_value()) {
// AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
// } else {
num_rows_ = row_count;
// }
}
void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
void
vector_search(SearchInfo& search_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
SearchResult& output) const override;
void
mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const override;
bool
is_system_field_ready() const {
return system_ready_count_ == 2;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
std::tuple<std::string, int64_t>
GetFieldDataPath(FieldId field_id, int64_t offset) const;
void
LoadVecIndex(const LoadIndexInfo& info);
void
LoadScalarIndex(const LoadIndexInfo& info);
virtual void
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) override;
bool
generate_interim_index(const FieldId field_id);
private:
// mmap descriptor, used in chunk cache
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
// segment loading state
BitsetType field_data_ready_bitset_;
BitsetType index_ready_bitset_;
BitsetType binlog_index_bitset_;
std::atomic<int> system_ready_count_ = 0;
// segment data
// TODO: generate index for scalar
std::optional<int64_t> num_rows_;
// scalar field index
std::unordered_map<FieldId, index::IndexBasePtr> scalar_indexings_;
// vector field index
SealedIndexingRecord vector_indexings_;
// inserted fields data and row_ids, timestamps
InsertRecord<true> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;
LoadFieldDataInfo field_data_info_;
SchemaPtr schema_;
int64_t id_;
std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnBase>> fields_;
std::unordered_set<FieldId> mmap_fields_;
// only useful in binlog
IndexMetaPtr col_index_meta_;
SegcoreConfig segcore_config_;
std::unordered_map<FieldId, std::unique_ptr<VecIndexConfig>>
vec_binlog_config_;
SegmentStats stats_{};
// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;
// whether the segment is sorted by the pk
bool is_sorted_by_pk_ = false;
};
} // namespace milvus::segcore

View File

@ -234,9 +234,11 @@ class ConcurrentVectorImpl : public VectorBase {
if (element_count == 0) {
return;
}
auto size =
size_per_chunk_ == MAX_ROW_COUNT ? element_count : size_per_chunk_;
chunks_ptr_->emplace_to_at_least(
upper_div(element_offset + element_count, size_per_chunk_),
elements_per_row_ * size_per_chunk_);
upper_div(element_offset + element_count, size),
elements_per_row_ * size);
set_data(
element_offset, static_cast<const Type*>(source), element_count);
}

View File

@ -27,6 +27,7 @@
#include "common/Schema.h"
#include "common/Types.h"
#include "fmt/format.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Column.h"
#include "segcore/AckResponder.h"
#include "segcore/ConcurrentVector.h"
@ -487,12 +488,52 @@ struct InsertRecord {
void
insert_pks(milvus::DataType data_type,
const std::shared_ptr<ColumnBase>& data) {
const std::shared_ptr<ChunkedColumnBase>& data) {
std::lock_guard lck(shared_mutex_);
int64_t offset = 0;
switch (data_type) {
case DataType::INT64: {
auto column = std::dynamic_pointer_cast<Column>(data);
auto column = std::dynamic_pointer_cast<ChunkedColumn>(data);
auto num_chunk = column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto pks =
reinterpret_cast<const int64_t*>(column->Data(i));
for (int i = 0; i < column->NumRows(); ++i) {
pk2offset_->insert(pks[i], offset++);
}
}
break;
}
case DataType::VARCHAR: {
auto column = std::dynamic_pointer_cast<
ChunkedVariableColumn<std::string>>(data);
auto num_chunk = column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto pks = column->StringViews(i).first;
for (auto& pk : pks) {
pk2offset_->insert(std::string(pk), offset++);
}
}
break;
}
default: {
PanicInfo(DataTypeInvalid,
fmt::format("unsupported primary key data type",
data_type));
}
}
}
void
insert_pks(milvus::DataType data_type,
const std::shared_ptr<SingleChunkColumnBase>& data) {
std::lock_guard lck(shared_mutex_);
int64_t offset = 0;
switch (data_type) {
case DataType::INT64: {
auto column =
std::dynamic_pointer_cast<SingleChunkColumn>(data);
auto pks = reinterpret_cast<const int64_t*>(column->Data());
for (int i = 0; i < column->NumRows(); ++i) {
pk2offset_->insert(pks[i], offset++);
@ -500,9 +541,8 @@ struct InsertRecord {
break;
}
case DataType::VARCHAR: {
auto column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
data);
auto column = std::dynamic_pointer_cast<
SingleChunkVariableColumn<std::string>>(data);
auto pks = column->Views();
for (int i = 0; i < column->NumRows(); ++i) {

View File

@ -399,7 +399,7 @@ SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
}
int64_t
SegmentGrowingImpl::num_chunk() const {
SegmentGrowingImpl::num_chunk(FieldId field_id) const {
auto size = get_insert_record().ack_responder_.GetAck();
return upper_div(size, segcore_config_.get_chunk_rows());
}

View File

@ -135,6 +135,22 @@ class SegmentGrowingImpl : public SegmentGrowing {
return segcore_config_.get_chunk_rows();
}
virtual int64_t
chunk_size(FieldId field_id, int64_t chunk_id) const final {
return segcore_config_.get_chunk_rows();
}
std::pair<int64_t, int64_t>
get_chunk_by_offset(FieldId field_id, int64_t offset) const override {
auto size_per_chunk = segcore_config_.get_chunk_rows();
return {offset / size_per_chunk, offset % size_per_chunk};
}
int64_t
num_rows_until_chunk(FieldId field_id, int64_t chunk_id) const override {
return chunk_id * segcore_config_.get_chunk_rows();
}
void
try_remove_chunks(FieldId fieldId);
@ -320,7 +336,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
protected:
int64_t
num_chunk() const override;
num_chunk(FieldId field_id) const override;
SpanBase
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;

View File

@ -392,14 +392,6 @@ SegmentInternalInterface::LoadPrimitiveSkipIndex(milvus::FieldId field_id,
field_id, chunk_id, data_type, chunk_data, valid_data, count);
}
void
SegmentInternalInterface::LoadStringSkipIndex(
milvus::FieldId field_id,
int64_t chunk_id,
const milvus::VariableColumn<std::string>& var_column) {
skip_index_.LoadString(field_id, chunk_id, var_column);
}
index::TextMatchIndex*
SegmentInternalInterface::GetTextIndex(FieldId field_id) const {
std::shared_lock lock(mutex_);

View File

@ -21,6 +21,7 @@
#include "DeletedRecord.h"
#include "FieldIndexing.h"
#include "common/Common.h"
#include "common/Schema.h"
#include "common/Span.h"
#include "common/SystemProperty.h"
@ -179,13 +180,24 @@ class SegmentInternalInterface : public SegmentInterface {
BufferView buffer = chunk_info.first;
std::vector<ViewType> res;
res.reserve(length);
char* pos = buffer.data_;
for (size_t j = 0; j < length; j++) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t);
res.emplace_back(ViewType(pos, size));
pos += size;
if (buffer.data_.index() == 1) {
char* pos = std::get<1>(buffer.data_).first;
for (size_t j = 0; j < length; j++) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t);
res.emplace_back(ViewType(pos, size));
pos += size;
}
} else {
auto elements = std::get<0>(buffer.data_);
for (auto& element : elements) {
for (int i = element.start_; i < element.end_; i++) {
res.emplace_back(ViewType(
element.data_ + element.offsets_[i],
element.offsets_[i + 1] - element.offsets_[i]));
}
}
}
return std::make_pair(res, chunk_info.second);
}
@ -246,6 +258,10 @@ class SegmentInternalInterface : public SegmentInterface {
set_field_avg_size(FieldId field_id,
int64_t num_rows,
int64_t field_size) override;
virtual bool
is_chunked() const {
return false;
}
const SkipIndex&
GetSkipIndex() const;
@ -258,10 +274,13 @@ class SegmentInternalInterface : public SegmentInterface {
const bool* valid_data,
int64_t count);
template <typename T>
void
LoadStringSkipIndex(FieldId field_id,
int64_t chunk_id,
const milvus::VariableColumn<std::string>& var_column);
const T& var_column) {
skip_index_.LoadString(field_id, chunk_id, var_column);
}
virtual DataType
GetFieldDataType(FieldId fieldId) const = 0;
@ -291,6 +310,9 @@ class SegmentInternalInterface : public SegmentInterface {
virtual int64_t
num_chunk_data(FieldId field_id) const = 0;
virtual int64_t
num_rows_until_chunk(FieldId field_id, int64_t chunk_id) const = 0;
// bitset 1 means not hit. 0 means hit.
virtual void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
@ -298,7 +320,13 @@ class SegmentInternalInterface : public SegmentInterface {
// count of chunks
virtual int64_t
num_chunk() const = 0;
num_chunk(FieldId field_id) const = 0;
virtual int64_t
chunk_size(FieldId field_id, int64_t chunk_id) const = 0;
virtual std::pair<int64_t, int64_t>
get_chunk_by_offset(FieldId field_id, int64_t offset) const = 0;
// element size in each chunk
virtual int64_t
@ -384,7 +412,13 @@ class SegmentInternalInterface : public SegmentInterface {
// internal API: return chunk_index in span, support scalar index only
virtual const index::IndexBase*
chunk_index_impl(FieldId field_id, int64_t chunk_id) const = 0;
virtual void
check_search(const query::Plan* plan) const = 0;
virtual const ConcurrentVector<Timestamp>&
get_timestamps() const = 0;
public:
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type
virtual void
bulk_subscript(SystemFieldType system_type,
@ -405,12 +439,6 @@ class SegmentInternalInterface : public SegmentInterface {
int64_t count,
const std::vector<std::string>& dynamic_field_names) const = 0;
virtual void
check_search(const query::Plan* plan) const = 0;
virtual const ConcurrentVector<Timestamp>&
get_timestamps() const = 0;
protected:
mutable std::shared_mutex mutex_;
// fieldID -> std::pair<num_rows, avg_size>

View File

@ -19,7 +19,6 @@
#include "pb/segcore.pb.h"
#include "segcore/SegmentInterface.h"
#include "segcore/Types.h"
#include "mmap/Column.h"
namespace milvus::segcore {
@ -42,6 +41,12 @@ class SegmentSealed : public SegmentInternalInterface {
AddFieldDataInfoForSealed(const LoadFieldDataInfo& field_data_info) = 0;
virtual void
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0;
virtual void
RemoveFieldFile(const FieldId field_id) = 0;
virtual void
ClearData() = 0;
virtual std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const = 0;
virtual void
LoadTextIndex(FieldId field_id,

View File

@ -345,15 +345,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
: DEFAULT_MEM_VRCOL_BLOCK_SIZE;
};
std::shared_ptr<ColumnBase> column{};
std::shared_ptr<SingleChunkColumnBase> column{};
if (IsVariableDataType(data_type)) {
int64_t field_data_size = 0;
switch (data_type) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
auto var_column =
std::make_shared<VariableColumn<std::string>>(
num_rows, field_meta, get_block_size());
auto var_column = std::make_shared<
SingleChunkVariableColumn<std::string>>(
num_rows, field_meta, get_block_size());
FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
var_column->Append(std::move(field_data));
@ -366,9 +366,9 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
break;
}
case milvus::DataType::JSON: {
auto var_column =
std::make_shared<VariableColumn<milvus::Json>>(
num_rows, field_meta, get_block_size());
auto var_column = std::make_shared<
SingleChunkVariableColumn<milvus::Json>>(
num_rows, field_meta, get_block_size());
FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
var_column->Append(std::move(field_data));
@ -380,8 +380,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
break;
}
case milvus::DataType::ARRAY: {
auto var_column =
std::make_shared<ArrayColumn>(num_rows, field_meta);
auto var_column = std::make_shared<SingleChunkArrayColumn>(
num_rows, field_meta);
FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
for (auto i = 0; i < field_data->get_num_rows(); i++) {
@ -407,7 +407,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
break;
}
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
auto col = std::make_shared<SparseFloatColumn>(field_meta);
auto col = std::make_shared<SingleChunkSparseFloatColumn>(
field_meta);
FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
stats_.mem_size += field_data->Size();
@ -426,7 +427,7 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
SegmentInternalInterface::set_field_avg_size(
field_id, num_rows, field_data_size);
} else {
column = std::make_shared<Column>(num_rows, field_meta);
column = std::make_shared<SingleChunkColumn>(num_rows, field_meta);
FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
column->AppendBatch(field_data);
@ -516,24 +517,25 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
valid_data);
}
WriteFieldPadding(file, data_type, total_written);
std::shared_ptr<ColumnBase> column{};
std::shared_ptr<SingleChunkColumnBase> column{};
auto num_rows = data.row_count;
if (IsVariableDataType(data_type)) {
switch (data_type) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
auto var_column = std::make_shared<VariableColumn<std::string>>(
file,
total_written,
field_meta,
DEFAULT_MMAP_VRCOL_BLOCK_SIZE);
auto var_column =
std::make_shared<SingleChunkVariableColumn<std::string>>(
file,
total_written,
field_meta,
DEFAULT_MMAP_VRCOL_BLOCK_SIZE);
var_column->Seal(std::move(indices));
column = std::move(var_column);
break;
}
case milvus::DataType::JSON: {
auto var_column =
std::make_shared<VariableColumn<milvus::Json>>(
std::make_shared<SingleChunkVariableColumn<milvus::Json>>(
file,
total_written,
field_meta,
@ -543,7 +545,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
break;
}
case milvus::DataType::ARRAY: {
auto arr_column = std::make_shared<ArrayColumn>(
auto arr_column = std::make_shared<SingleChunkArrayColumn>(
file, total_written, field_meta);
arr_column->Seal(std::move(indices),
std::move(element_indices));
@ -551,8 +553,9 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
break;
}
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, total_written, field_meta, std::move(indices));
auto sparse_column =
std::make_shared<SingleChunkSparseFloatColumn>(
file, total_written, field_meta, std::move(indices));
column = std::move(sparse_column);
break;
}
@ -562,7 +565,8 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
}
}
} else {
column = std::make_shared<Column>(file, total_written, field_meta);
column = std::make_shared<SingleChunkColumn>(
file, total_written, field_meta);
}
column->SetValidData(std::move(valid_data));
@ -664,7 +668,7 @@ SegmentSealedImpl::num_chunk_data(FieldId field_id) const {
}
int64_t
SegmentSealedImpl::num_chunk() const {
SegmentSealedImpl::num_chunk(FieldId field_id) const {
return 1;
}
@ -790,9 +794,8 @@ SegmentSealedImpl::search_pk(const PkType& pk, Timestamp timestamp) const {
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
pk_column);
auto var_column = std::dynamic_pointer_cast<
SingleChunkVariableColumn<std::string>>(pk_column);
auto views = var_column->Views();
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
@ -843,9 +846,8 @@ SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const {
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
pk_column);
auto var_column = std::dynamic_pointer_cast<
SingleChunkVariableColumn<std::string>>(pk_column);
auto views = var_column->Views();
auto it = std::lower_bound(views.begin(), views.end(), target);
while (it != views.end() && *it == target) {
@ -1057,17 +1059,24 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const {
return {data_path, offset_in_binlog};
}
std::tuple<std::string, std::shared_ptr<ColumnBase>> static ReadFromChunkCache(
const storage::ChunkCachePtr& cc,
const std::string& data_path,
const storage::MmapChunkDescriptorPtr& descriptor) {
std::tuple<
std::string,
std::shared_ptr<
SingleChunkColumnBase>> static ReadFromChunkCache(const storage::
ChunkCachePtr& cc,
const std::string&
data_path,
const storage::
MmapChunkDescriptorPtr&
descriptor) {
// For mmap mode, field_meta is unused, so just construct a fake field meta.
auto fm =
FieldMeta(FieldName(""), FieldId(0), milvus::DataType::NONE, false);
// TODO: add Load() interface for chunk cache when support retrieve_enable, make Read() raise error if cache miss
auto column = cc->Read(data_path, descriptor, fm, true);
cc->Prefetch(data_path);
return {data_path, column};
return {data_path,
std::dynamic_pointer_cast<SingleChunkColumnBase>(column)};
}
std::unique_ptr<DataArray>
@ -1115,7 +1124,8 @@ SegmentSealedImpl::get_vector(FieldId field_id,
auto id_to_data_path =
std::unordered_map<std::int64_t, std::tuple<std::string, int64_t>>{};
auto path_to_column =
std::unordered_map<std::string, std::shared_ptr<ColumnBase>>{};
std::unordered_map<std::string,
std::shared_ptr<SingleChunkColumnBase>>{};
for (auto i = 0; i < count; i++) {
const auto& tuple = GetFieldDataPath(field_id, ids[i]);
id_to_data_path.emplace(ids[i], tuple);
@ -1124,8 +1134,8 @@ SegmentSealedImpl::get_vector(FieldId field_id,
// read and prefetch
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector<
std::future<std::tuple<std::string, std::shared_ptr<ColumnBase>>>>
std::vector<std::future<
std::tuple<std::string, std::shared_ptr<SingleChunkColumnBase>>>>
futures;
futures.reserve(path_to_column.size());
for (const auto& iter : path_to_column) {
@ -1152,7 +1162,7 @@ SegmentSealedImpl::get_vector(FieldId field_id,
column->NumRows(),
data_path);
auto sparse_column =
std::dynamic_pointer_cast<SparseFloatColumn>(column);
std::dynamic_pointer_cast<SingleChunkSparseFloatColumn>(column);
AssertInfo(sparse_column, "incorrect column created");
buf[i] = static_cast<const knowhere::sparse::SparseRow<float>*>(
static_cast<const void*>(
@ -1344,11 +1354,11 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw,
template <typename S, typename T>
void
SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column,
SegmentSealedImpl::bulk_subscript_impl(const SingleChunkColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw) {
auto field = reinterpret_cast<const VariableColumn<S>*>(column);
auto field = reinterpret_cast<const SingleChunkVariableColumn<S>*>(column);
auto dst = reinterpret_cast<T*>(dst_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
@ -1359,11 +1369,11 @@ SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column,
template <typename S, typename T>
void
SegmentSealedImpl::bulk_subscript_ptr_impl(
const ColumnBase* column,
const SingleChunkColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst) {
auto field = reinterpret_cast<const VariableColumn<S>*>(column);
auto field = reinterpret_cast<const SingleChunkVariableColumn<S>*>(column);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst->at(i) = std::move(T(field->RawAt(offset)));
@ -1373,11 +1383,11 @@ SegmentSealedImpl::bulk_subscript_ptr_impl(
template <typename T>
void
SegmentSealedImpl::bulk_subscript_array_impl(
const ColumnBase* column,
const SingleChunkColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst) {
auto field = reinterpret_cast<const ArrayColumn*>(column);
auto field = reinterpret_cast<const SingleChunkArrayColumn*>(column);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst->at(i) = std::move(field->RawAt(offset));
@ -1630,7 +1640,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!IsVectorDataType(field_meta.get_data_type())) {
AssertInfo(num_chunk() == 1,
AssertInfo(num_chunk(field_id) == 1,
"num chunk not equal to 1 for sealed segment");
auto index = chunk_index_impl(field_id, 0);
if (index->HasRawData()) {
@ -1669,7 +1679,8 @@ SegmentSealedImpl::bulk_subscript(
}
}
auto dst = ret->mutable_scalars()->mutable_json_data()->mutable_data();
auto field = reinterpret_cast<const VariableColumn<Json>*>(column.get());
auto field =
reinterpret_cast<const SingleChunkVariableColumn<Json>*>(column.get());
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst->at(i) = ExtractSubJson(std::string(field->RawAt(offset)),
@ -1965,14 +1976,16 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
if (row_count < field_binlog_config->GetBuildThreshold()) {
return false;
}
std::shared_ptr<ColumnBase> vec_data{};
std::shared_ptr<SingleChunkColumnBase> vec_data{};
{
std::shared_lock lck(mutex_);
vec_data = fields_.at(field_id);
}
auto dim = is_sparse
? dynamic_cast<SparseFloatColumn*>(vec_data.get())->Dim()
: field_meta.get_dim();
auto dim =
is_sparse
? dynamic_cast<SingleChunkSparseFloatColumn*>(vec_data.get())
->Dim()
: field_meta.get_dim();
auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
@ -2049,9 +2062,8 @@ SegmentSealedImpl::CreateTextIndex(FieldId field_id) {
// build
auto iter = fields_.find(field_id);
if (iter != fields_.end()) {
auto column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
iter->second);
auto column = std::dynamic_pointer_cast<
SingleChunkVariableColumn<std::string>>(iter->second);
AssertInfo(
column != nullptr,
"failed to create text index, field is not of text type: {}",

View File

@ -31,6 +31,7 @@
#include "google/protobuf/message_lite.h"
#include "mmap/Column.h"
#include "index/ScalarIndex.h"
#include "segcore/ChunkedSegmentSealedImpl.h"
#include "sys/mman.h"
#include "common/Types.h"
#include "common/IndexMeta.h"
@ -127,7 +128,9 @@ class SegmentSealedImpl : public SegmentSealed {
Timestamp query_timestamp) const;
std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const;
get_vector(FieldId field_id,
const int64_t* ids,
int64_t count) const override;
bool
is_nullable(FieldId field_id) const override {
@ -147,12 +150,30 @@ class SegmentSealedImpl : public SegmentSealed {
num_chunk_data(FieldId field_id) const override;
int64_t
num_chunk() const override;
num_chunk(FieldId field_id) const override;
// return size_per_chunk for each chunk, renaming against confusion
int64_t
size_per_chunk() const override;
int64_t
chunk_size(FieldId field_id, int64_t chunk_id) const override {
PanicInfo(ErrorCode::Unsupported, "Not implemented");
}
bool
is_chunked() const override {
return false;
}
std::pair<int64_t, int64_t>
get_chunk_by_offset(FieldId field_id, int64_t offset) const override {
PanicInfo(ErrorCode::Unsupported, "Not implemented");
}
int64_t
num_rows_until_chunk(FieldId field_id, int64_t chunk_id) const override {
PanicInfo(ErrorCode::Unsupported, "Not implemented");
}
std::string
debug() const override;
@ -231,21 +252,21 @@ class SegmentSealedImpl : public SegmentSealed {
template <typename S, typename T = S>
static void
bulk_subscript_impl(const ColumnBase* field,
bulk_subscript_impl(const SingleChunkColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw);
template <typename S, typename T = S>
static void
bulk_subscript_ptr_impl(const ColumnBase* field,
bulk_subscript_ptr_impl(const SingleChunkColumnBase* field,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst_raw);
template <typename T>
static void
bulk_subscript_array_impl(const ColumnBase* column,
bulk_subscript_array_impl(const SingleChunkColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
google::protobuf::RepeatedPtrField<T>* dst);
@ -348,7 +369,7 @@ class SegmentSealedImpl : public SegmentSealed {
SchemaPtr schema_;
int64_t id_;
std::unordered_map<FieldId, std::shared_ptr<ColumnBase>> fields_;
std::unordered_map<FieldId, std::shared_ptr<SingleChunkColumnBase>> fields_;
std::unordered_set<FieldId> mmap_fields_;
// only useful in binlog
@ -374,13 +395,24 @@ CreateSealedSegment(
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
bool TEST_skip_index_for_retrieve = false,
bool is_sorted_by_pk = false) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve,
is_sorted_by_pk);
bool is_sorted_by_pk = false,
bool is_multi_chunk = false) {
if (!is_multi_chunk) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve,
is_sorted_by_pk);
} else {
return std::make_unique<ChunkedSegmentSealedImpl>(
schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve,
is_sorted_by_pk);
}
}
} // namespace milvus::segcore
} // namespace milvus::segcore

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/Utils.h"
#include <arrow/record_batch.h>
#include <future>
#include <memory>
@ -22,6 +23,7 @@
#include "index/ScalarIndex.h"
#include "mmap/Utils.h"
#include "log/Log.h"
#include "storage/DataCodec.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"
@ -783,6 +785,42 @@ ReverseDataFromIndex(const index::IndexBase* index,
// init segcore storage config first, and create default remote chunk manager
// segcore use default remote chunk manager to load data from minio/s3
void
LoadArrowReaderFromRemote(const std::vector<std::string>& remote_files,
std::shared_ptr<ArrowReaderChannel> channel) {
try {
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH);
std::vector<std::future<std::shared_ptr<milvus::ArrowDataWrapper>>>
futures;
futures.reserve(remote_files.size());
for (const auto& file : remote_files) {
auto future = pool.Submit([&]() {
auto fileSize = rcm->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
rcm->Read(file, buf.get(), fileSize);
auto result =
storage::DeserializeFileData(buf, fileSize, false);
result->SetData(buf);
return result->GetReader();
});
futures.emplace_back(std::move(future));
}
for (auto& future : futures) {
auto field_data = future.get();
channel->push(field_data);
}
channel->close();
} catch (std::exception& e) {
LOG_INFO("failed to load data from remote: {}", e.what());
channel->close(std::current_exception());
}
}
void
LoadFieldDatasFromRemote(const std::vector<std::string>& remote_files,
FieldDataChannelPtr channel) {
@ -815,7 +853,6 @@ LoadFieldDatasFromRemote(const std::vector<std::string>& remote_files,
channel->close(std::current_exception());
}
}
int64_t
upper_bound(const ConcurrentVector<Timestamp>& timestamps,
int64_t first,

View File

@ -184,10 +184,13 @@ ReverseDataFromIndex(const index::IndexBase* index,
int64_t count,
const FieldMeta& field_meta);
void
LoadArrowReaderFromRemote(const std::vector<std::string>& remote_files,
std::shared_ptr<ArrowReaderChannel> channel);
void
LoadFieldDatasFromRemote(const std::vector<std::string>& remote_files,
FieldDataChannelPtr channel);
/**
* Returns an index pointing to the first element in the range [first, last) such that `value < element` is true
* (i.e. that is strictly greater than value), or last if no such element is found.

View File

@ -26,9 +26,11 @@
#include "log/Log.h"
#include "mmap/Types.h"
#include "segcore/Collection.h"
#include "segcore/SegcoreConfig.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/SegmentSealedImpl.h"
#include "segcore/Utils.h"
#include "storage/Event.h"
#include "storage/Util.h"
#include "futures/Future.h"
#include "futures/Executor.h"
@ -59,8 +61,20 @@ NewSegment(CCollection collection,
segment_id,
milvus::segcore::SegcoreConfig::default_config(),
false,
is_sorted_by_pk);
is_sorted_by_pk,
false);
break;
case ChunkedSealed:
segment = milvus::segcore::CreateSealedSegment(
col->get_schema(),
col->get_index_meta(),
segment_id,
milvus::segcore::SegcoreConfig::default_config(),
false,
is_sorted_by_pk,
true);
break;
default:
PanicInfo(milvus::UnexpectedError,
"invalid segment type: {}",
@ -82,7 +96,7 @@ DeleteSegment(CSegmentInterface c_segment) {
void
ClearSegmentData(CSegmentInterface c_segment) {
auto s = static_cast<milvus::segcore::SegmentSealedImpl*>(c_segment);
auto s = static_cast<milvus::segcore::SegmentSealed*>(c_segment);
s->ClearData();
}
@ -549,8 +563,7 @@ WarmupChunkCache(CSegmentInterface c_segment,
void
RemoveFieldFile(CSegmentInterface c_segment, int64_t field_id) {
auto segment =
reinterpret_cast<milvus::segcore::SegmentSealedImpl*>(c_segment);
auto segment = reinterpret_cast<milvus::segcore::SegmentSealed*>(c_segment);
segment->RemoveFieldFile(milvus::FieldId(field_id));
}

View File

@ -18,9 +18,97 @@
#include <memory>
#include "ChunkCache.h"
#include "common/ChunkWriter.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
#include "log/Log.h"
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta) {
// use rlock to get future
{
std::shared_lock lck(mutex_);
auto it = columns_.find(filepath);
if (it != columns_.end()) {
lck.unlock();
auto result = it->second.second.get();
AssertInfo(result, "unexpected null column, file={}", filepath);
return result;
}
}
// lock for mutation
std::unique_lock lck(mutex_);
// double check no-futurn
auto it = columns_.find(filepath);
if (it != columns_.end()) {
lck.unlock();
auto result = it->second.second.get();
AssertInfo(result, "unexpected null column, file={}", filepath);
return result;
}
std::promise<std::shared_ptr<ColumnBase>> p;
std::shared_future<std::shared_ptr<ColumnBase>> f = p.get_future();
columns_.emplace(filepath, std::make_pair(std::move(p), f));
lck.unlock();
// release lock and perform download and decode
// other thread request same path shall get the future.
bool allocate_success = false;
ErrorCode err_code = Success;
std::string err_msg = "";
std::shared_ptr<ChunkedColumnBase> column;
try {
auto field_data =
DownloadAndDecodeRemoteFile(cm_.get(), filepath, false);
auto chunk = create_chunk(
field_meta, field_meta.get_dim(), field_data->GetReader()->reader);
auto data_type = field_meta.get_data_type();
if (IsSparseFloatVectorDataType(data_type)) {
auto sparse_column =
std::make_shared<ChunkedSparseFloatColumn>(field_meta);
sparse_column->AddChunk(chunk);
column = std::move(sparse_column);
} else if (IsVariableDataType(data_type)) {
AssertInfo(false,
"TODO: unimplemented for variable data type: {}",
data_type);
} else {
std::vector<std::shared_ptr<Chunk>> chunks{chunk};
column = std::make_shared<ChunkedColumn>(chunks);
}
} catch (const SegcoreError& e) {
err_code = e.get_error_code();
err_msg = fmt::format("failed to read for chunkCache, seg_core_err:{}",
e.what());
}
std::unique_lock mmap_lck(mutex_);
it = columns_.find(filepath);
if (it != columns_.end()) {
// check pair exists then set value
it->second.first.set_value(column);
if (allocate_success) {
AssertInfo(column, "unexpected null column, file={}", filepath);
}
} else {
PanicInfo(UnexpectedError,
"Wrong code, the thread to download for cache should get the "
"target entry");
}
if (err_code != Success) {
columns_.erase(filepath);
throw SegcoreError(err_code, err_msg);
}
return column;
}
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
@ -98,7 +186,8 @@ ChunkCache::Read(const std::string& filepath,
}
} else {
PanicInfo(UnexpectedError,
"Wrong code, the thread to download for cache should get the "
"Wrong code, the thread to download for "
"cache should get the "
"target entry");
}
if (err_code != Success) {
@ -148,23 +237,25 @@ ChunkCache::ConvertToColumn(const FieldDataPtr& field_data,
if (IsSparseFloatVectorDataType(data_type)) {
if (mmap_enabled) {
column = std::make_shared<SparseFloatColumn>(mcm_, descriptor);
column = std::make_shared<SingleChunkSparseFloatColumn>(mcm_,
descriptor);
} else {
column = std::make_shared<SparseFloatColumn>(field_meta);
column = std::make_shared<SingleChunkSparseFloatColumn>(field_meta);
}
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
if (mmap_enabled) {
column = std::make_shared<Column>(field_data->Size(),
data_type,
mcm_,
descriptor,
field_data->IsNullable());
column =
std::make_shared<SingleChunkColumn>(field_data->Size(),
data_type,
mcm_,
descriptor,
field_data->IsNullable());
} else {
column = std::make_shared<Column>(field_data->get_num_rows(),
field_meta);
column = std::make_shared<SingleChunkColumn>(
field_data->get_num_rows(), field_meta);
}
}
column->AppendBatch(field_data);

View File

@ -17,8 +17,9 @@
#pragma once
#include <future>
#include <unordered_map>
#include "common/FieldMeta.h"
#include "storage/MmapChunkManager.h"
#include "mmap/Column.h"
#include "mmap/ChunkedColumn.h"
namespace milvus::storage {
@ -44,6 +45,11 @@ class ChunkCache {
~ChunkCache() = default;
public:
std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta);
std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
@ -58,6 +64,9 @@ class ChunkCache {
Prefetch(const std::string& filepath);
private:
std::string
CachePath(const std::string& filepath);
std::shared_ptr<ColumnBase>
ConvertToColumn(const FieldDataPtr& field_data,
const MmapChunkDescriptorPtr& descriptor,

View File

@ -27,7 +27,7 @@ namespace milvus::storage {
// deserialize remote insert and index file
std::unique_ptr<DataCodec>
DeserializeRemoteFileData(BinlogReaderPtr reader) {
DeserializeRemoteFileData(BinlogReaderPtr reader, bool is_field_data) {
DescriptorEvent descriptor_event(reader);
DataType data_type =
DataType(descriptor_event.event_data.fix_part.data_type);
@ -45,10 +45,17 @@ DeserializeRemoteFileData(BinlogReaderPtr reader) {
case EventType::InsertEvent: {
auto event_data_length =
header.event_length_ - GetEventHeaderSize(header);
auto insert_event_data =
InsertEventData(reader, event_data_length, data_type, nullable);
auto insert_data =
std::make_unique<InsertData>(insert_event_data.field_data);
auto insert_event_data = InsertEventData(
reader, event_data_length, data_type, nullable, is_field_data);
std::unique_ptr<InsertData> insert_data;
if (is_field_data) {
insert_data =
std::make_unique<InsertData>(insert_event_data.field_data);
} else {
insert_data = std::make_unique<InsertData>(
insert_event_data.payload_reader);
}
insert_data->SetFieldDataMeta(data_meta);
insert_data->SetTimestamps(insert_event_data.start_timestamp,
insert_event_data.end_timestamp);
@ -105,13 +112,14 @@ DeserializeLocalFileData(BinlogReaderPtr reader) {
std::unique_ptr<DataCodec>
DeserializeFileData(const std::shared_ptr<uint8_t[]> input_data,
int64_t length) {
int64_t length,
bool is_field_data) {
auto binlog_reader = std::make_shared<BinlogReader>(input_data, length);
auto medium_type = ReadMediumType(binlog_reader);
std::unique_ptr<DataCodec> res;
switch (medium_type) {
case StorageType::Remote: {
res = DeserializeRemoteFileData(binlog_reader);
res = DeserializeRemoteFileData(binlog_reader, is_field_data);
break;
}
case StorageType::LocalDisk: {

View File

@ -16,11 +16,14 @@
#pragma once
#include <arrow/record_batch.h>
#include <cstdint>
#include <vector>
#include <memory>
#include <utility>
#include "common/FieldData.h"
#include "storage/PayloadReader.h"
#include "storage/Types.h"
#include "storage/PayloadStream.h"
#include "storage/BinlogReader.h"
@ -33,6 +36,10 @@ class DataCodec {
: field_data_(std::move(data)), codec_type_(type) {
}
explicit DataCodec(std::shared_ptr<PayloadReader> reader, CodecType type)
: payload_reader_(reader), codec_type_(type) {
}
virtual ~DataCodec() = default;
// Serialized data can be written directly to remote or local disk
@ -69,18 +76,36 @@ class DataCodec {
return field_data_;
}
virtual std::shared_ptr<ArrowDataWrapper>
GetReader() {
auto ret = std::make_shared<ArrowDataWrapper>();
ret->reader = payload_reader_->get_reader();
ret->arrow_reader = payload_reader_->get_file_reader();
ret->file_data = data_;
return ret;
}
void
SetData(std::shared_ptr<uint8_t[]> data) {
data_ = data;
}
protected:
CodecType codec_type_;
std::pair<Timestamp, Timestamp> time_range_;
FieldDataPtr field_data_;
std::shared_ptr<PayloadReader> payload_reader_;
std::shared_ptr<uint8_t[]> data_;
};
// Deserialize the data stream of the file obtained from remote or local
std::unique_ptr<DataCodec>
DeserializeFileData(const std::shared_ptr<uint8_t[]> input, int64_t length);
DeserializeFileData(const std::shared_ptr<uint8_t[]> input,
int64_t length,
bool is_field_data = true);
std::unique_ptr<DataCodec>
DeserializeRemoteFileData(BinlogReaderPtr reader);
DeserializeRemoteFileData(BinlogReaderPtr reader, bool is_field_data);
std::unique_ptr<DataCodec>
DeserializeLocalFileData(BinlogReaderPtr reader);

View File

@ -210,7 +210,8 @@ DescriptorEventData::Serialize() {
BaseEventData::BaseEventData(BinlogReaderPtr reader,
int event_length,
DataType data_type,
bool nullable) {
bool nullable,
bool is_field_data) {
auto ast = reader->Read(sizeof(start_timestamp), &start_timestamp);
AssertInfo(ast.ok(), "read start timestamp failed");
ast = reader->Read(sizeof(end_timestamp), &end_timestamp);
@ -220,9 +221,11 @@ BaseEventData::BaseEventData(BinlogReaderPtr reader,
event_length - sizeof(start_timestamp) - sizeof(end_timestamp);
auto res = reader->Read(payload_length);
AssertInfo(res.first.ok(), "read payload failed");
auto payload_reader = std::make_shared<PayloadReader>(
res.second.get(), payload_length, data_type, nullable);
field_data = payload_reader->get_field_data();
payload_reader = std::make_shared<PayloadReader>(
res.second.get(), payload_length, data_type, nullable, is_field_data);
if (is_field_data) {
field_data = payload_reader->get_field_data();
}
}
std::vector<uint8_t>

View File

@ -24,6 +24,7 @@
#include "common/FieldData.h"
#include "common/Types.h"
#include "storage/PayloadReader.h"
#include "storage/Types.h"
#include "storage/BinlogReader.h"
@ -76,12 +77,14 @@ struct BaseEventData {
Timestamp start_timestamp;
Timestamp end_timestamp;
FieldDataPtr field_data;
std::shared_ptr<PayloadReader> payload_reader;
BaseEventData() = default;
explicit BaseEventData(BinlogReaderPtr reader,
int event_length,
DataType data_type,
bool nullable);
bool nullable,
bool is_field_data = true);
std::vector<uint8_t>
Serialize();

View File

@ -20,6 +20,7 @@
#include <memory>
#include "storage/DataCodec.h"
#include "storage/PayloadReader.h"
namespace milvus::storage {
@ -29,6 +30,10 @@ class InsertData : public DataCodec {
: DataCodec(data, CodecType::InsertDataType) {
}
explicit InsertData(std::shared_ptr<PayloadReader> payload_reader)
: DataCodec(payload_reader, CodecType::InsertDataType) {
}
std::vector<uint8_t>
Serialize(StorageType medium) override;

View File

@ -28,14 +28,16 @@ namespace milvus::storage {
PayloadReader::PayloadReader(const uint8_t* data,
int length,
DataType data_type,
bool nullable)
bool nullable,
bool is_field_data)
: column_type_(data_type), nullable_(nullable) {
auto input = std::make_shared<arrow::io::BufferReader>(data, length);
init(input);
init(input, is_field_data);
}
void
PayloadReader::init(std::shared_ptr<arrow::io::BufferReader> input) {
PayloadReader::init(std::shared_ptr<arrow::io::BufferReader> input,
bool is_field_data) {
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Configure general Parquet reader settings
@ -73,17 +75,21 @@ PayloadReader::init(std::shared_ptr<arrow::io::BufferReader> input) {
st = arrow_reader->GetRecordBatchReader(&rb_reader);
AssertInfo(st.ok(), "get record batch reader");
field_data_ =
CreateFieldData(column_type_, nullable_, dim_, total_num_rows);
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch :
*rb_reader) {
AssertInfo(maybe_batch.ok(), "get batch record success");
auto array = maybe_batch.ValueOrDie()->column(column_index);
// to read
field_data_->FillFieldData(array);
if (is_field_data) {
field_data_ =
CreateFieldData(column_type_, nullable_, dim_, total_num_rows);
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch :
*rb_reader) {
AssertInfo(maybe_batch.ok(), "get batch record success");
auto array = maybe_batch.ValueOrDie()->column(column_index);
// to read
field_data_->FillFieldData(array);
}
AssertInfo(field_data_->IsFull(), "field data hasn't been filled done");
} else {
arrow_reader_ = std::move(arrow_reader);
record_batch_reader_ = std::move(rb_reader);
}
AssertInfo(field_data_->IsFull(), "field data hasn't been filled done");
// LOG_INFO("Peak arrow memory pool size {}", pool)->max_memory();
}
} // namespace milvus::storage

View File

@ -17,6 +17,7 @@
#pragma once
#include <memory>
#include <arrow/record_batch.h>
#include <parquet/arrow/reader.h>
#include "common/FieldData.h"
@ -29,23 +30,37 @@ class PayloadReader {
explicit PayloadReader(const uint8_t* data,
int length,
DataType data_type,
bool nullable_);
bool nullable,
bool is_field_data = true);
~PayloadReader() = default;
void
init(std::shared_ptr<arrow::io::BufferReader> buffer);
init(std::shared_ptr<arrow::io::BufferReader> buffer, bool is_field_data);
const FieldDataPtr
get_field_data() const {
return field_data_;
}
std::shared_ptr<arrow::RecordBatchReader>
get_reader() {
return record_batch_reader_;
}
std::shared_ptr<parquet::arrow::FileReader>
get_file_reader() {
return arrow_reader_;
}
private:
DataType column_type_;
int dim_;
bool nullable_;
FieldDataPtr field_data_;
std::shared_ptr<parquet::arrow::FileReader> arrow_reader_;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader_;
};
} // namespace milvus::storage

View File

@ -539,12 +539,15 @@ GetSegmentRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id) {
std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager,
const std::string& file) {
const std::string& file,
bool is_field_data) {
auto fileSize = chunk_manager->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
chunk_manager->Read(file, buf.get(), fileSize);
return DeserializeFileData(buf, fileSize);
auto res = DeserializeFileData(buf, fileSize, is_field_data);
res->SetData(buf);
return res;
}
std::pair<std::string, size_t>
@ -599,7 +602,7 @@ GetObjectData(ChunkManager* remote_chunk_manager,
futures.reserve(remote_files.size());
for (auto& file : remote_files) {
futures.emplace_back(pool.Submit(
DownloadAndDecodeRemoteFile, remote_chunk_manager, file));
DownloadAndDecodeRemoteFile, remote_chunk_manager, file, true));
}
return futures;
}

View File

@ -102,7 +102,8 @@ GetSegmentRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id);
std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager,
const std::string& file);
const std::string& file,
bool is_field_data = true);
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,

View File

@ -9,17 +9,23 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <fcntl.h>
#include <gtest/gtest.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <parquet/arrow/reader.h>
#include <unistd.h>
#include <memory>
#include <string>
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "common/Chunk.h"
#include "common/ChunkWriter.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/FieldMeta.h"
#include "common/File.h"
#include "common/Types.h"
#include "storage/Event.h"
#include "storage/Util.h"
@ -53,8 +59,7 @@ TEST(chunk, test_int64_field) {
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::INT64, false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto span =
std::dynamic_pointer_cast<FixedWidthChunk<int64_t>>(chunk)->Span();
auto span = std::dynamic_pointer_cast<FixedWidthChunk>(chunk)->Span();
EXPECT_EQ(span.row_count(), data.size());
for (size_t i = 0; i < data.size(); ++i) {
auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof());
@ -92,7 +97,7 @@ TEST(chunk, test_variable_field) {
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews();
for (size_t i = 0; i < data.size(); ++i) {
EXPECT_EQ(views[i], data[i]);
EXPECT_EQ(views.first[i], data[i]);
}
}
@ -183,4 +188,68 @@ TEST(chunk, test_sparse_float) {
EXPECT_EQ(v1[j].val, v2[j].val);
}
}
}
class TempDir {
public:
TempDir() {
auto path = boost::filesystem::unique_path("%%%%_%%%%");
auto abs_path = boost::filesystem::temp_directory_path() / path;
boost::filesystem::create_directory(abs_path);
dir_ = abs_path;
}
~TempDir() {
boost::filesystem::remove_all(dir_);
}
std::string
dir() {
return dir_.string();
}
private:
boost::filesystem::path dir_;
};
TEST(chunk, multiple_chunk_mmap) {
TempDir temp;
std::string temp_dir = temp.dir();
auto file = File::Open(temp_dir + "/multi_chunk_mmap", O_CREAT | O_RDWR);
FixedVector<int64_t> data = {1, 2, 3, 4, 5};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::INT64);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::INT64, false);
int file_offset = 0;
auto page_size = sysconf(_SC_PAGESIZE);
auto chunk = create_chunk(field_meta, 1, file, file_offset, rb_reader);
EXPECT_TRUE(chunk->Size() % page_size == 0);
file_offset += chunk->Size();
std::shared_ptr<::arrow::RecordBatchReader> rb_reader2;
s = arrow_reader->GetRecordBatchReader(&rb_reader2);
EXPECT_TRUE(s.ok());
auto chunk2 = create_chunk(field_meta, 1, file, file_offset, rb_reader2);
EXPECT_TRUE(chunk->Size() % page_size == 0);
}

View File

@ -508,7 +508,7 @@ TEST(Sealed, LoadFieldData) {
vec_info.index_params["metric_type"] = knowhere::metric::L2;
segment->LoadIndex(vec_info);
ASSERT_EQ(segment->num_chunk(), 1);
ASSERT_EQ(segment->num_chunk(FieldId(0)), 1);
ASSERT_EQ(segment->num_chunk_index(double_id), 0);
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
@ -671,7 +671,7 @@ TEST(Sealed, ClearData) {
vec_info.index_params["metric_type"] = knowhere::metric::L2;
segment->LoadIndex(vec_info);
ASSERT_EQ(segment->num_chunk(), 1);
ASSERT_EQ(segment->num_chunk(FieldId(0)), 1);
ASSERT_EQ(segment->num_chunk_index(double_id), 0);
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
@ -775,7 +775,7 @@ TEST(Sealed, LoadFieldDataMmap) {
vec_info.index_params["metric_type"] = knowhere::metric::L2;
segment->LoadIndex(vec_info);
ASSERT_EQ(segment->num_chunk(), 1);
ASSERT_EQ(segment->num_chunk(FieldId(0)), 1);
ASSERT_EQ(segment->num_chunk_index(double_id), 0);
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);

View File

@ -46,7 +46,7 @@ TEST(Span, Naive) {
auto float_ptr = dataset.get_col<float>(float_vec_fid);
auto nullable_data_ptr = dataset.get_col<int64_t>(nullable_fid);
auto nullable_valid_data_ptr = dataset.get_col_valid(nullable_fid);
auto num_chunk = segment->num_chunk();
auto num_chunk = segment->num_chunk(FieldId(0));
ASSERT_EQ(num_chunk, upper_div(N, size_per_chunk));
auto row_count = segment->get_row_count();
ASSERT_EQ(N, row_count);

View File

@ -298,11 +298,16 @@ func NewSegment(ctx context.Context,
return nil, err
}
multipleChunkEnable := paramtable.Get().QueryNodeCfg.MultipleChunkedEnable.GetAsBool()
var cSegType C.SegmentType
var locker *state.LoadStateLock
switch segmentType {
case SegmentTypeSealed:
cSegType = C.Sealed
if multipleChunkEnable {
cSegType = C.ChunkedSealed
} else {
cSegType = C.Sealed
}
locker = state.NewLoadStateLock(state.LoadStateOnlyMeta)
case SegmentTypeGrowing:
locker = state.NewLoadStateLock(state.LoadStateDataLoaded)

View File

@ -2336,6 +2336,7 @@ type queryNodeConfig struct {
InterimIndexNProbe ParamItem `refreshable:"false"`
InterimIndexMemExpandRate ParamItem `refreshable:"false"`
InterimIndexBuildParallelRate ParamItem `refreshable:"false"`
MultipleChunkedEnable ParamItem `refreshable:"false"`
KnowhereScoreConsistency ParamItem `refreshable:"false"`
@ -2546,6 +2547,15 @@ This defaults to true, indicating that Milvus creates temporary index for growin
}
p.InterimIndexBuildParallelRate.Init(base.mgr)
p.MultipleChunkedEnable = ParamItem{
Key: "queryNode.segcore.multipleChunkedEnable",
Version: "2.0.0",
DefaultValue: "false",
Doc: "Enable multiple chunked search",
Export: true,
}
p.MultipleChunkedEnable.Init(base.mgr)
p.InterimIndexNProbe = ParamItem{
Key: "queryNode.segcore.interimIndex.nprobe",
Version: "2.0.0",