From 321105cc014a9315c9559a19f56ca67e522c7d81 Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Wed, 23 Mar 2022 19:05:25 +0800 Subject: [PATCH] Add column-based insert interface in segcore (#16100) Signed-off-by: Letian Jiang --- internal/core/src/segcore/segment_c.cpp | 37 ++++ internal/core/src/segcore/segment_c.h | 9 + internal/core/unittest/test_c_api.cpp | 218 ++++++++++++++++++++++++ 3 files changed, 264 insertions(+) diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index e0c0ff32c3..54e6475c54 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -150,6 +150,43 @@ Insert(CSegmentInterface c_segment, } } +CStatus +InsertColumnData(CSegmentInterface c_segment, + int64_t reserved_offset, + int64_t size, + const int64_t* row_ids, + const uint64_t* timestamps, + void* raw_data, + int64_t count) { + try { + auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + milvus::segcore::ColumnBasedRawData dataChunk{}; + + auto& schema = segment->get_schema(); + auto sizeof_infos = schema.get_sizeof_infos(); + dataChunk.columns_ = std::vector>(schema.size()); + // reverse space for each field + for (int fid = 0; fid < schema.size(); ++fid) { + auto len = sizeof_infos[fid]; + dataChunk.columns_[fid].resize(len * size); + } + auto col_data = reinterpret_cast(raw_data); + int64_t offset = 0; + for (int fid = 0; fid < schema.size(); ++fid) { + auto len = sizeof_infos[fid] * size; + auto src = col_data + offset; + auto dst = dataChunk.columns_[fid].data(); + memcpy(dst, src, len); + offset += len; + } + dataChunk.count = count; + segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(UnexpectedError, e.what()); + } +} + CStatus PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset) { try { diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 987fca61ce..9f45d6e6fa 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -71,6 +71,15 @@ Insert(CSegmentInterface c_segment, int sizeof_per_row, int64_t count); +CStatus +InsertColumnData(CSegmentInterface c_segment, + int64_t reserved_offset, + int64_t size, + const int64_t* row_ids, + const uint64_t* timestamps, + void* raw_data, + int64_t count); + CStatus PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset); diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 64454ae9ac..17f25fdf9a 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -103,6 +103,29 @@ generate_data(int N) { return std::make_tuple(raw_data, timestamps, uids); } +auto +generate_column_data(int N) { + std::vector raw_data; + std::vector timestamps; + std::vector uids; + std::default_random_engine e(42); + std::normal_distribution<> dis(0.0, 1.0); + for (int i = 0; i < N; ++i) { + uids.push_back(10 * N + i); + timestamps.push_back(0); + float vec[DIM]; + for (auto& x : vec) { + x = dis(e); + } + raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec)); + } + for (int i = 0; i < N; ++i) { + int age = e() % 100; + raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age)); + } + return std::make_tuple(raw_data, timestamps, uids); +} + std::string generate_query_data(int nq) { namespace ser = milvus::proto::milvus; @@ -216,6 +239,23 @@ TEST(CApiTest, InsertTest) { DeleteSegment(segment); } +TEST(CApiTest, InsertColumnDataTest) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + + int N = 10000; + auto [raw_data, timestamps, uids] = generate_column_data(N); + + int64_t offset; + PreInsert(segment, N, &offset); + + auto res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N); + assert(res.error_code == Success); + + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, DeleteTest) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); @@ -300,6 +340,73 @@ TEST(CApiTest, SearchTest) { DeleteSegment(segment); } +TEST(CApiTest, SearchTest2) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + + int N = 10000; + auto [raw_data, timestamps, uids] = generate_column_data(N); + + int64_t ts_offset = 1000; + for (int i = 0; i < N; i++) { + timestamps[i] = ts_offset + i; + } + + int64_t offset; + PreInsert(segment, N, &offset); + + auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N); + ASSERT_EQ(ins_res.error_code, Success); + + const char* dsl_string = R"( + { + "bool": { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 10, + "round_decimal": 3 + } + } + } + })"; + + int num_queries = 10; + auto blob = generate_query_data(num_queries); + + void* plan = nullptr; + auto status = CreateSearchPlan(collection, dsl_string, &plan); + ASSERT_EQ(status.error_code, Success); + + void* placeholderGroup = nullptr; + status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup); + ASSERT_EQ(status.error_code, Success); + + std::vector placeholderGroups; + placeholderGroups.push_back(placeholderGroup); + timestamps.clear(); + timestamps.push_back(1); + + CSearchResult search_result; + auto res = Search(segment, plan, placeholderGroup, N + ts_offset, &search_result, -1); + ASSERT_EQ(res.error_code, Success); + + CSearchResult search_result2; + auto res2 = Search(segment, plan, placeholderGroup, ts_offset, &search_result2, -1); + ASSERT_EQ(res2.error_code, Success); + + DeleteSearchPlan(plan); + DeletePlaceholderGroup(placeholderGroup); + DeleteSearchResult(search_result); + DeleteSearchResult(search_result2); + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, SearchTestWithExpr) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); @@ -352,6 +459,57 @@ TEST(CApiTest, SearchTestWithExpr) { DeleteSegment(segment); } +TEST(CApiTest, SearchTestWithExpr2) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + + int N = 10000; + auto [raw_data, timestamps, uids] = generate_column_data(N); + + int64_t offset; + PreInsert(segment, N, &offset); + + auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N); + ASSERT_EQ(ins_res.error_code, Success); + + const char* serialized_expr_plan = R"(vector_anns: < + field_id: 100 + query_info: < + topk: 10 + metric_type: "L2" + search_params: "{\"nprobe\": 10}" + > + placeholder_tag: "$0" + >)"; + + int num_queries = 10; + auto blob = generate_query_data(num_queries); + + void* plan = nullptr; + auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan); + auto status = CreateSearchPlanByExpr(collection, binary_plan.data(), binary_plan.size(), &plan); + ASSERT_EQ(status.error_code, Success); + + void* placeholderGroup = nullptr; + status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup); + ASSERT_EQ(status.error_code, Success); + + std::vector placeholderGroups; + placeholderGroups.push_back(placeholderGroup); + timestamps.clear(); + timestamps.push_back(1); + + CSearchResult search_result; + auto res = Search(segment, plan, placeholderGroup, timestamps[0], &search_result, -1); + ASSERT_EQ(res.error_code, Success); + + DeleteSearchPlan(plan); + DeletePlaceholderGroup(placeholderGroup); + DeleteSearchResult(search_result); + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, RetrieveTestWithExpr) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); @@ -388,6 +546,41 @@ TEST(CApiTest, RetrieveTestWithExpr) { DeleteSegment(segment); } +TEST(CApiTest, RetrieveTestWithExpr2) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + + int N = 10000; + auto [raw_data, timestamps, uids] = generate_column_data(N); + + int64_t offset; + PreInsert(segment, N, &offset); + + auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N); + ASSERT_EQ(ins_res.error_code, Success); + + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + auto plan = std::make_unique(*schema); + + // create retrieve plan "age in [0]" + std::vector values(1, 0); + auto term_expr = std::make_unique>(FieldOffset(1), DataType::INT32, values); + + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + std::vector target_offsets{FieldOffset(0), FieldOffset(1)}; + plan->field_offsets_ = target_offsets; + + CRetrieveResult retrieve_result; + auto res = Retrieve(segment, plan.release(), timestamps[0], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + + DeleteRetrievePlan(plan.release()); + DeleteRetrieveResult(&retrieve_result); + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, GetMemoryUsageInBytesTest) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); @@ -414,6 +607,31 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { DeleteSegment(segment); } +TEST(CApiTest, GetMemoryUsageInBytesTest2) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + + auto old_memory_usage_size = GetMemoryUsageInBytes(segment); + // std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl; + assert(old_memory_usage_size == 0); + + int N = 10000; + auto [raw_data, timestamps, uids] = generate_column_data(N); + + int64_t offset; + PreInsert(segment, N, &offset); + + auto res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N); + assert(res.error_code == Success); + + auto memory_usage_size = GetMemoryUsageInBytes(segment); + // std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl; + assert(memory_usage_size == 2785280); + + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, GetDeletedCountTest) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1);