Enable segment sealed, add support for loadXXX

Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
FluorineDog 2021-01-20 17:33:31 +08:00 committed by yefu.chen
parent 527c0c49df
commit 5e06dc1732
13 changed files with 268 additions and 92 deletions

View File

@ -26,6 +26,6 @@ struct LoadIndexInfo {
// NOTE: Refer to common/SystemProperty.cpp for details
struct LoadFieldDataInfo {
int64_t field_id;
void* blob = nullptr;
const void* blob = nullptr;
int64_t row_count = -1;
};

View File

@ -78,6 +78,11 @@ class Span<T, typename std::enable_if_t<std::is_fundamental_v<T>>> {
return data_;
}
const T&
operator[](int64_t offset) const {
return data_[offset];
}
int64_t
row_count() const {
return row_count_;

View File

@ -17,10 +17,10 @@ class SystemPropertyImpl : public SystemProperty {
public:
[[nodiscard]] bool
SystemFieldVerify(const FieldName& field_name, FieldId field_id) const override {
if (!name_to_types_.count(field_name)) {
if (!IsSystem(field_name)) {
return false;
}
if (!id_to_types_.count(field_id)) {
if (!IsSystem(field_id)) {
return false;
}
auto left_id = name_to_types_.at(field_name);
@ -30,16 +30,26 @@ class SystemPropertyImpl : public SystemProperty {
SystemFieldType
GetSystemFieldType(FieldName field_name) const override {
Assert(name_to_types_.count(field_name));
Assert(IsSystem(field_name));
return name_to_types_.at(field_name);
}
SystemFieldType
GetSystemFieldType(FieldId field_id) const override {
Assert(id_to_types_.count(field_id));
Assert(IsSystem(field_id));
return id_to_types_.at(field_id);
}
bool
IsSystem(FieldId field_id) const override {
return id_to_types_.count(field_id);
}
bool
IsSystem(FieldName field_name) const override {
return name_to_types_.count(field_name);
}
friend const SystemProperty&
SystemProperty::Instance();

View File

@ -34,6 +34,12 @@ class SystemProperty {
virtual SystemFieldType
GetSystemFieldType(FieldName field_name) const = 0;
virtual bool
IsSystem(FieldId field_id) const = 0;
virtual bool
IsSystem(FieldName field_name) const = 0;
};
} // namespace milvus

View File

@ -13,6 +13,7 @@ set(SEGCORE_FILES
reduce_c.cpp
load_index_c.cpp
SealedIndexingRecord.cpp
SegmentInterface.cpp
)
add_library(milvus_segcore SHARED
${SEGCORE_FILES}

View File

@ -248,42 +248,6 @@ SegmentGrowingImpl::Search(const query::Plan* plan,
return results;
}
void
SegmentGrowingImpl::FillTargetEntry(const query::Plan* plan, QueryResult& results) const {
AssertInfo(plan, "empty plan");
auto size = results.result_distances_.size();
Assert(results.internal_seg_offsets_.size() == size);
Assert(results.result_offsets_.size() == size);
Assert(results.row_data_.size() == 0);
if (plan->schema_.get_is_auto_id()) {
auto& uids = record_.uids_;
for (int64_t i = 0; i < size; ++i) {
auto seg_offset = results.internal_seg_offsets_[i];
auto row_id = seg_offset == -1 ? -1 : uids[seg_offset];
std::vector<char> blob(sizeof(row_id));
memcpy(blob.data(), &row_id, sizeof(row_id));
results.row_data_.emplace_back(std::move(blob));
}
} else {
auto key_offset_opt = schema_->get_primary_key_offset();
Assert(key_offset_opt.has_value());
auto key_offset = key_offset_opt.value();
auto& field_meta = schema_->operator[](key_offset);
Assert(field_meta.get_data_type() == DataType::INT64);
auto uids = record_.get_entity<int64_t>(key_offset);
for (int64_t i = 0; i < size; ++i) {
auto seg_offset = results.internal_seg_offsets_[i];
auto row_id = seg_offset == -1 ? -1 : uids->operator[](seg_offset);
std::vector<char> blob(sizeof(row_id));
memcpy(blob.data(), &row_id, sizeof(row_id));
results.row_data_.emplace_back(std::move(blob));
}
}
}
Status
SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) {
auto field_offset = schema_->get_offset(FieldName(info.field_name));

View File

@ -125,6 +125,45 @@ class SegmentGrowingImpl : public SegmentGrowing {
return 0;
}
template <typename T>
void
bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const {
static_assert(IsScalar<T>);
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
Assert(vec_ptr);
auto& vec = *vec_ptr;
auto output = reinterpret_cast<T*>(output_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
output[i] = offset == -1 ? -1 : vec[offset];
}
}
void
bulk_subscript(SystemFieldType system_type,
const int64_t* seg_offsets,
int64_t count,
void* output) const override {
switch (system_type) {
case SystemFieldType::Timestamp:
PanicInfo("timestamp unsupported");
case SystemFieldType::RowId:
bulk_subscript_impl<int64_t>(this->record_.uids_, seg_offsets, count, output);
break;
default:
PanicInfo("unknown subscript fields");
}
}
void
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override {
// TODO: support more types
auto vec_ptr = record_.get_base_entity(field_offset);
auto data_type = schema_->operator[](field_offset).get_data_type();
Assert(data_type == DataType::INT64);
bulk_subscript_impl<int64_t>(*vec_ptr, seg_offsets, count, output);
}
int64_t
num_chunk_data() const override;
@ -146,9 +185,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
void
FillTargetEntry(const query::Plan* Plan, QueryResult& results) const override;
protected:
SpanBase
chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const override;

View File

@ -0,0 +1,42 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/SegmentInterface.h"
namespace milvus::segcore {
class Naive;
void
SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const {
AssertInfo(plan, "empty plan");
auto size = results.result_distances_.size();
Assert(results.internal_seg_offsets_.size() == size);
Assert(results.result_offsets_.size() == size);
Assert(results.row_data_.size() == 0);
std::vector<int64_t> target(size);
if (plan->schema_.get_is_auto_id()) {
// use row_id
bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, target.data());
} else {
auto key_offset_opt = get_schema().get_primary_key_offset();
Assert(key_offset_opt.has_value());
auto key_offset = key_offset_opt.value();
bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, target.data());
}
for (int64_t i = 0; i < size; ++i) {
auto row_id = target[i];
std::vector<char> blob(sizeof(row_id));
memcpy(blob.data(), &row_id, sizeof(row_id));
results.row_data_.emplace_back(std::move(blob));
}
}
} // namespace milvus::segcore

View File

@ -16,13 +16,14 @@
#include "common/Span.h"
#include "IndexingEntry.h"
#include <knowhere/index/vector_index/VecIndex.h>
#include "common/SystemProperty.h"
namespace milvus::segcore {
class SegmentInterface {
public:
virtual void
FillTargetEntry(const query::Plan* Plan, QueryResult& results) const = 0;
void
FillTargetEntry(const query::Plan* plan, QueryResult& results) const;
virtual QueryResult
Search(const query::Plan* Plan,
@ -40,6 +41,13 @@ class SegmentInterface {
get_schema() const = 0;
virtual ~SegmentInterface() = default;
protected:
virtual void
bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
virtual void
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
};
// internal API for DSL calculation

View File

@ -37,22 +37,42 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
// TODO
Assert(info.row_count > 0);
auto field_id = FieldId(info.field_id);
auto field_offset = schema_->get_offset(field_id);
auto& field_meta = schema_->operator[](field_offset);
Assert(!field_meta.is_vector());
auto element_sizeof = field_meta.get_sizeof();
auto length_in_bytes = element_sizeof * info.row_count;
aligned_vector<char> vecdata(length_in_bytes);
memcpy(vecdata.data(), info.blob, length_in_bytes);
std::unique_lock lck(mutex_);
if (row_count_opt_.has_value()) {
AssertInfo(row_count_opt_.value() == info.row_count, "load data has different row count from other columns");
Assert(info.blob);
Assert(info.row_count > 0);
if (SystemProperty::Instance().IsSystem(field_id)) {
auto system_field_type = SystemProperty::Instance().GetSystemFieldType(field_id);
Assert(system_field_type == SystemFieldType::RowId);
auto src_ptr = reinterpret_cast<const idx_t*>(info.blob);
// prepare data
aligned_vector<idx_t> vec_data(info.row_count);
std::copy_n(src_ptr, info.row_count, vec_data.data());
// write data under lock
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
AssertInfo(row_ids_.empty(), "already exists");
row_ids_ = std::move(vec_data);
++ready_count_;
} else {
row_count_opt_ = info.row_count;
// prepare data
auto field_offset = schema_->get_offset(field_id);
auto& field_meta = schema_->operator[](field_offset);
Assert(!field_meta.is_vector());
auto element_sizeof = field_meta.get_sizeof();
auto length_in_bytes = element_sizeof * info.row_count;
aligned_vector<char> vec_data(length_in_bytes);
memcpy(vec_data.data(), info.blob, length_in_bytes);
// write data under lock
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
AssertInfo(columns_data_[field_offset.get()].empty(), "already exists");
columns_data_[field_offset.get()] = std::move(vec_data);
++ready_count_;
}
AssertInfo(columns_data_[field_offset.get()].empty(), "already exists");
columns_data_[field_offset.get()] = std::move(vecdata);
++ready_count_;
}
int64_t
@ -63,27 +83,28 @@ SegmentSealedImpl::num_chunk_index_safe(FieldOffset field_offset) const {
int64_t
SegmentSealedImpl::num_chunk_data() const {
PanicInfo("unimplemented");
return 1;
}
int64_t
SegmentSealedImpl::size_per_chunk() const {
PanicInfo("unimplemented");
return get_row_count();
}
SpanBase
SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const {
PanicInfo("unimplemented");
std::shared_lock lck(mutex_);
auto& field_meta = schema_->operator[](field_offset);
auto element_sizeof = field_meta.get_sizeof();
Assert(is_all_ready());
SpanBase base(columns_data_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof);
return base;
}
const knowhere::Index*
SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const {
PanicInfo("unimplemented");
}
void
SegmentSealedImpl::FillTargetEntry(const query::Plan* Plan, QueryResult& results) const {
PanicInfo("unimplemented");
// TODO: support scalar index
return nullptr;
}
QueryResult
@ -96,7 +117,9 @@ SegmentSealedImpl::Search(const query::Plan* Plan,
int64_t
SegmentSealedImpl::GetMemoryUsageInBytes() const {
PanicInfo("unimplemented");
// TODO: add estimate for index
auto row_count = row_count_opt_.value_or(0);
return schema_->get_total_sizeof() * row_count;
}
int64_t

View File

@ -26,9 +26,6 @@ class SegmentSealedImpl : public SegmentSealed {
LoadFieldData(const LoadFieldDataInfo& info) override;
public:
void
FillTargetEntry(const query::Plan* Plan, QueryResult& results) const override;
QueryResult
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
@ -63,10 +60,55 @@ class SegmentSealedImpl : public SegmentSealed {
const knowhere::Index*
chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const override;
// Calculate: output[i] = Vec[seg_offset[i]],
// where Vec is determined from field_offset
void
bulk_subscript(SystemFieldType system_type,
const int64_t* seg_offsets,
int64_t count,
void* output) const override {
Assert(is_all_ready());
Assert(system_type == SystemFieldType::RowId);
bulk_subscript_impl<int64_t>(row_ids_.data(), seg_offsets, count, output);
}
// Calculate: output[i] = Vec[seg_offset[i]]
// where Vec is determined from field_offset
void
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override {
Assert(is_all_ready());
auto& field_meta = schema_->operator[](field_offset);
Assert(field_meta.get_data_type() == DataType::INT64);
bulk_subscript_impl<int64_t>(columns_data_[field_offset.get()].data(), seg_offsets, count, output);
}
private:
template <typename T>
static void
bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) {
static_assert(IsScalar<T>);
auto src = reinterpret_cast<const T*>(src_raw);
auto dst = reinterpret_cast<T*>(dst_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst[i] = offset == -1 ? -1 : src[offset];
}
}
void
update_row_count(int64_t row_count) {
if (row_count_opt_.has_value()) {
AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
} else {
row_count_opt_ = row_count;
}
}
bool
is_all_ready() const {
return ready_count_ == schema_->size();
// TODO: optimize here
// NOTE: including row_ids
return ready_count_ == schema_->size() + 1;
}
mutable std::shared_mutex mutex_;

View File

@ -216,6 +216,32 @@ TEST(Sealed, with_predicate) {
}
}
void
SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) {
// TODO
auto row_count = dataset.row_ids_.size();
{
LoadFieldDataInfo info;
info.blob = dataset.row_ids_.data();
info.row_count = dataset.row_ids_.size();
info.field_id = 0; // field id for RowId
seg.LoadFieldData(info);
}
int field_offset = 0;
for (auto& meta : seg.get_schema().get_fields()) {
if (meta.is_vector()) {
++field_offset;
continue;
}
LoadFieldDataInfo info;
info.field_id = meta.get_id().get();
info.row_count = row_count;
info.blob = dataset.cols_[field_offset].data();
seg.LoadFieldData(info);
++field_offset;
}
}
TEST(Sealed, LoadFieldData) {
auto dim = 16;
auto topK = 5;
@ -223,14 +249,13 @@ TEST(Sealed, LoadFieldData) {
auto metric_type = MetricType::METRIC_L2;
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto counter_id = schema->AddDebugField("counter", DataType::INT64);
schema->AddDebugField("counter", DataType::INT64);
schema->AddDebugField("double", DataType::DOUBLE);
auto dataset = DataGen(schema, N);
auto fakevec = dataset.get_col<float>(0);
auto counter = dataset.get_col<int64_t>(1);
auto indexing = std::make_shared<knowhere::IVF>();
auto conf = knowhere::Config{{knowhere::meta::DIM, dim},
{knowhere::meta::TOPK, topK},
{knowhere::IndexParams::nlist, 100},
@ -238,21 +263,27 @@ TEST(Sealed, LoadFieldData) {
{knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
{knowhere::meta::DEVICEID, 0}};
auto database = knowhere::GenDataset(N, dim, fakevec.data());
auto indexing = std::make_shared<knowhere::IVF>();
indexing->Train(database, conf);
indexing->AddWithoutIds(database, conf);
auto segment = CreateSealedSegment(schema);
LoadFieldDataInfo field_info;
field_info.field_id = counter_id.get();
field_info.row_count = N;
field_info.blob = counter.data();
segment->LoadFieldData(field_info);
LoadIndexInfo vec_info;
vec_info.field_id = fakevec_id.get();
vec_info.field_name = "fakevec";
vec_info.index = indexing;
vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2;
segment->LoadIndex(vec_info);
int i = 1 + 1;
SealedLoader(dataset, *segment);
{
LoadIndexInfo vec_info;
vec_info.field_id = fakevec_id.get();
vec_info.field_name = "fakevec";
vec_info.index = indexing;
vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2;
segment->LoadIndex(vec_info);
}
ASSERT_EQ(segment->num_chunk_data(), 1);
auto chunk_span1 = segment->chunk_data<int64_t>(FieldOffset(1), 0);
auto chunk_span2 = segment->chunk_data<double>(FieldOffset(2), 0);
auto ref1 = dataset.get_col<int64_t>(1);
auto ref2 = dataset.get_col<double>(2);
for (int i = 0; i < N; ++i) {
ASSERT_EQ(chunk_span1[i], ref1[i]);
ASSERT_EQ(chunk_span2[i], ref2[i]);
}
}

View File

@ -29,7 +29,7 @@ struct GeneratedData {
RowBasedRawData raw_;
template <typename T>
auto
get_col(int index) {
get_col(int index) const {
auto& target = cols_.at(index);
std::vector<T> ret(target.size() / sizeof(T));
memcpy(ret.data(), target.data(), target.size());
@ -158,6 +158,14 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) {
insert_cols(data);
break;
}
case engine::DataType::DOUBLE: {
vector<double> data(N);
for (auto& x : data) {
x = distr(er);
}
insert_cols(data);
break;
}
default: {
throw std::runtime_error("unimplemented");
}