mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Optimize query/search on growing segment while output vector field (#26542)
Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
parent
c3f5856fbc
commit
ba882b49b6
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <numeric>
|
||||
#include <queue>
|
||||
@ -334,14 +335,14 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id,
|
||||
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
bulk_subscript_impl<FloatVector>(field_id,
|
||||
field_meta.get_sizeof(),
|
||||
*vec_ptr,
|
||||
vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
|
||||
bulk_subscript_impl<BinaryVector>(field_id,
|
||||
field_meta.get_sizeof(),
|
||||
*vec_ptr,
|
||||
vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
@ -357,55 +358,55 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id,
|
||||
case DataType::BOOL: {
|
||||
FixedVector<bool> output(count);
|
||||
bulk_subscript_impl<bool>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT8: {
|
||||
FixedVector<int8_t> output(count);
|
||||
bulk_subscript_impl<int8_t>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT16: {
|
||||
FixedVector<int16_t> output(count);
|
||||
bulk_subscript_impl<int16_t>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT32: {
|
||||
FixedVector<int32_t> output(count);
|
||||
bulk_subscript_impl<int32_t>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT64: {
|
||||
FixedVector<int64_t> output(count);
|
||||
bulk_subscript_impl<int64_t>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::FLOAT: {
|
||||
FixedVector<float> output(count);
|
||||
bulk_subscript_impl<float>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::DOUBLE: {
|
||||
FixedVector<double> output(count);
|
||||
bulk_subscript_impl<double>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
FixedVector<std::string> output(count);
|
||||
bulk_subscript_impl<std::string>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::JSON: {
|
||||
FixedVector<std::string> output(count);
|
||||
bulk_subscript_impl<Json, std::string>(
|
||||
*vec_ptr, seg_offsets, count, output.data());
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
default: {
|
||||
@ -418,26 +419,26 @@ template <typename T>
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,
|
||||
int64_t element_sizeof,
|
||||
const VectorBase& vec_raw,
|
||||
const VectorBase* vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const {
|
||||
static_assert(IsVector<T>);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(vec_raw);
|
||||
AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr");
|
||||
auto& vec = *vec_ptr;
|
||||
std::vector<uint8_t> empty(element_sizeof, 0);
|
||||
|
||||
auto copy_from_chunk = [&]() {
|
||||
auto output_base = reinterpret_cast<char*>(output_raw);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
auto dst = output_base + i * element_sizeof;
|
||||
auto offset = seg_offsets[i];
|
||||
const uint8_t* src =
|
||||
(offset == INVALID_SEG_OFFSET
|
||||
? empty.data()
|
||||
: (const uint8_t*)vec.get_element(offset));
|
||||
memcpy(dst, src, element_sizeof);
|
||||
if (offset == INVALID_SEG_OFFSET) {
|
||||
memset(dst, 0, element_sizeof);
|
||||
} else {
|
||||
auto src = (const uint8_t*)vec.get_element(offset);
|
||||
memcpy(dst, src, element_sizeof);
|
||||
}
|
||||
}
|
||||
};
|
||||
//HasRawData interface guarantees that data can be fetched from growing segment
|
||||
@ -457,12 +458,12 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,
|
||||
|
||||
template <typename S, typename T>
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript_impl(const VectorBase& vec_raw,
|
||||
SegmentGrowingImpl::bulk_subscript_impl(const VectorBase* vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const {
|
||||
static_assert(IsScalar<S>);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<S>*>(&vec_raw);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<S>*>(vec_raw);
|
||||
AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr");
|
||||
auto& vec = *vec_ptr;
|
||||
auto output = reinterpret_cast<T*>(output_raw);
|
||||
@ -482,11 +483,11 @@ SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type,
|
||||
switch (system_type) {
|
||||
case SystemFieldType::Timestamp:
|
||||
bulk_subscript_impl<Timestamp>(
|
||||
this->insert_record_.timestamps_, seg_offsets, count, output);
|
||||
&this->insert_record_.timestamps_, seg_offsets, count, output);
|
||||
break;
|
||||
case SystemFieldType::RowId:
|
||||
bulk_subscript_impl<int64_t>(
|
||||
this->insert_record_.row_ids_, seg_offsets, count, output);
|
||||
&this->insert_record_.row_ids_, seg_offsets, count, output);
|
||||
break;
|
||||
default:
|
||||
PanicInfo("unknown subscript fields");
|
||||
|
||||
@ -149,7 +149,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
// for scalar vectors
|
||||
template <typename S, typename T = S>
|
||||
void
|
||||
bulk_subscript_impl(const VectorBase& vec_raw,
|
||||
bulk_subscript_impl(const VectorBase* vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const;
|
||||
@ -158,7 +158,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
void
|
||||
bulk_subscript_impl(FieldId field_id,
|
||||
int64_t element_sizeof,
|
||||
const VectorBase& vec_raw,
|
||||
const VectorBase* vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const;
|
||||
|
||||
@ -11,6 +11,8 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "knowhere/comp/index_param.h"
|
||||
#include "segcore/SegmentGrowing.h"
|
||||
#include "segcore/SegmentGrowingImpl.h"
|
||||
#include "pb/schema.pb.h"
|
||||
@ -86,3 +88,92 @@ TEST(Growing, RealCount) {
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(0, segment->get_real_count());
|
||||
}
|
||||
|
||||
TEST(Growing, FillData) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto int8_field = schema->AddDebugField("int8", DataType::INT8);
|
||||
auto int16_field = schema->AddDebugField("int16", DataType::INT16);
|
||||
auto int32_field = schema->AddDebugField("int32", DataType::INT32);
|
||||
auto int64_field = schema->AddDebugField("int64", DataType::INT64);
|
||||
auto float_field = schema->AddDebugField("float", DataType::FLOAT);
|
||||
auto double_field = schema->AddDebugField("double", DataType::DOUBLE);
|
||||
auto varchar_field = schema->AddDebugField("varchar", DataType::VARCHAR);
|
||||
auto json_field = schema->AddDebugField("json", DataType::JSON);
|
||||
auto vec = schema->AddDebugField(
|
||||
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
|
||||
schema->set_primary_field_id(int64_field);
|
||||
|
||||
std::map<std::string, std::string> index_params = {
|
||||
{"index_type", "IVF_FLAT"},
|
||||
{"metric_type", metric_type},
|
||||
{"nlist", "128"}};
|
||||
std::map<std::string, std::string> type_params = {{"dim", "128"}};
|
||||
FieldIndexMeta fieldIndexMeta(
|
||||
vec, std::move(index_params), std::move(type_params));
|
||||
auto& config = SegcoreConfig::default_config();
|
||||
config.set_chunk_rows(1024);
|
||||
config.set_enable_growing_segment_index(true);
|
||||
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
|
||||
IndexMetaPtr metaPtr =
|
||||
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));
|
||||
auto segment_growing = CreateGrowingSegment(schema, metaPtr);
|
||||
auto segment = dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
|
||||
|
||||
int64_t per_batch = 1000;
|
||||
int64_t n_batch = 3;
|
||||
int64_t dim = 128;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
auto int8_values = dataset.get_col<int8_t>(int8_field);
|
||||
auto int16_values = dataset.get_col<int16_t>(int16_field);
|
||||
auto int32_values = dataset.get_col<int32_t>(int32_field);
|
||||
auto int64_values = dataset.get_col<int64_t>(int64_field);
|
||||
auto float_values = dataset.get_col<float>(float_field);
|
||||
auto double_values = dataset.get_col<double>(double_field);
|
||||
auto varchar_values = dataset.get_col<std::string>(varchar_field);
|
||||
auto json_values = dataset.get_col<std::string>(json_field);
|
||||
auto vector_values = dataset.get_col<float>(vec);
|
||||
|
||||
auto offset = segment->PreInsert(per_batch);
|
||||
segment->Insert(offset,
|
||||
per_batch,
|
||||
dataset.row_ids_.data(),
|
||||
dataset.timestamps_.data(),
|
||||
dataset.raw_);
|
||||
auto num_inserted = (i + 1) * per_batch;
|
||||
auto ids_ds = GenRandomIds(num_inserted);
|
||||
auto int8_result =
|
||||
segment->bulk_subscript(int8_field, ids_ds->GetIds(), num_inserted);
|
||||
auto int16_result = segment->bulk_subscript(
|
||||
int16_field, ids_ds->GetIds(), num_inserted);
|
||||
auto int32_result = segment->bulk_subscript(
|
||||
int32_field, ids_ds->GetIds(), num_inserted);
|
||||
auto int64_result = segment->bulk_subscript(
|
||||
int64_field, ids_ds->GetIds(), num_inserted);
|
||||
auto float_result = segment->bulk_subscript(
|
||||
float_field, ids_ds->GetIds(), num_inserted);
|
||||
auto double_result = segment->bulk_subscript(
|
||||
double_field, ids_ds->GetIds(), num_inserted);
|
||||
auto varchar_result = segment->bulk_subscript(
|
||||
varchar_field, ids_ds->GetIds(), num_inserted);
|
||||
auto json_result =
|
||||
segment->bulk_subscript(json_field, ids_ds->GetIds(), num_inserted);
|
||||
auto vec_result =
|
||||
segment->bulk_subscript(vec, ids_ds->GetIds(), num_inserted);
|
||||
|
||||
EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int32_result->scalars().int_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int64_result->scalars().long_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(float_result->scalars().float_data().data_size(),
|
||||
num_inserted);
|
||||
EXPECT_EQ(double_result->scalars().double_data().data_size(),
|
||||
num_inserted);
|
||||
EXPECT_EQ(varchar_result->scalars().string_data().data_size(),
|
||||
num_inserted);
|
||||
EXPECT_EQ(json_result->scalars().json_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(vec_result->vectors().float_vector().data_size(),
|
||||
num_inserted * dim);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user