diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 0a1fb55d1b..c49153d6ee 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -2,7 +2,8 @@ enable_testing() find_package(GTest REQUIRED) set(MILVUS_TEST_FILES test_naive.cpp - # test_dog_segment.cpp + test_dog_segment.cpp + test_concurrent_vector.cpp test_c_api.cpp ) add_executable(all_tests diff --git a/core/unittest/test_c_api.cpp b/core/unittest/test_c_api.cpp index 658dbc7a24..1c7faffae5 100644 --- a/core/unittest/test_c_api.cpp +++ b/core/unittest/test_c_api.cpp @@ -137,7 +137,7 @@ TEST(CApiTest, SearchTest) { long result_ids[10]; float result_distances[10]; - auto sea_res = Search(segment, nullptr, 0, result_ids, result_distances); + auto sea_res = Search(segment, nullptr, 1, result_ids, result_distances); assert(sea_res == 0); assert(result_ids[0] == 100911); diff --git a/core/unittest/test_concurrent_vector.cpp b/core/unittest/test_concurrent_vector.cpp new file mode 100644 index 0000000000..023e310072 --- /dev/null +++ b/core/unittest/test_concurrent_vector.cpp @@ -0,0 +1,129 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "dog_segment/ConcurrentVector.h" +#include "dog_segment/SegmentBase.h" +// #include "knowhere/index/vector_index/helpers/IndexParameter.h" + +#include "dog_segment/SegmentBase.h" +#include "dog_segment/AckResponder.h" + +using std::cin; +using std::cout; +using std::endl; +using namespace milvus::engine; +using namespace milvus::dog_segment; +using std::vector; + +TEST(ConcurrentVector, TestABI) { + ASSERT_EQ(TestABI(), 42); + assert(true); +} + +TEST(ConcurrentVector, TestSingle) { + auto dim = 8; + ConcurrentVector c_vec(dim); + std::default_random_engine e(42); + int data = 0; + auto total_count = 0; + for (int i = 0; i < 10000; ++i) { + int insert_size = e() % 150; + vector vec(insert_size * dim); + for (auto& x : vec) { + x = data++; + } + c_vec.grow_to_at_least(total_count + insert_size); + c_vec.set_data(total_count, vec.data(), insert_size); + total_count += insert_size; + } + ASSERT_EQ(c_vec.chunk_size(), (total_count + 31) / 32); + for (int i = 0; i < total_count; ++i) { + for (int d = 0; d < dim; ++d) { + auto std_data = d + i * dim; + ASSERT_EQ(c_vec.get_element(i)[d], std_data); + } + } +} + +TEST(ConcurrentVector, TestMultithreads) { + auto dim = 8; + constexpr int threads = 16; + std::vector total_counts(threads); + + ConcurrentVector c_vec(dim); + std::atomic ack_counter = 0; + // std::mutex mutex; + + auto executor = [&](int thread_id) { + std::default_random_engine e(42 + thread_id); + int64_t data = 0; + int64_t total_count = 0; + for (int i = 0; i < 10000; ++i) { + // std::lock_guard lck(mutex); + int insert_size = e() % 150; + vector vec(insert_size * dim); + for (auto& x : vec) { + x = data++ * threads + thread_id; + } + auto offset = ack_counter.fetch_add(insert_size); + c_vec.grow_to_at_least(offset + insert_size); + c_vec.set_data(offset, vec.data(), insert_size); + total_count += insert_size; + } + assert(data == total_count * dim); + total_counts[thread_id] = total_count; + }; + std::vector pool; + for (int i = 0; i < threads; ++i) { + pool.emplace_back(executor, i); + } + for (auto& thread : pool) { + thread.join(); + } + + std::vector counts(threads); + auto N = ack_counter.load(); + for (int64_t i = 0; i < N; ++i) { + for (int d = 0; d < dim; ++d) { + auto data = c_vec.get_element(i)[d]; + auto thread_id = data % threads; + auto raw_data = data / threads; + auto std_data = counts[thread_id]++; + ASSERT_EQ(raw_data, std_data) << data; + } + } +} +TEST(ConcurrentVector, TestAckSingle) { + std::vector> raw_data; + std::default_random_engine e(42); + AckResponder ack; + int N = 10000; + for(int i = 0; i < 10000; ++i) { + auto weight = i + e() % 100; + raw_data.emplace_back(weight, i, (i + 1)); + } + std::sort(raw_data.begin(), raw_data.end()); + for(auto [_, b, e]: raw_data) { + EXPECT_LE(ack.GetAck(), b); + ack.AddSegment(b, e); + auto seg = ack.GetAck(); + EXPECT_GE(seg + 100, b); + } + EXPECT_EQ(ack.GetAck(), N); +} diff --git a/core/unittest/test_dog_segment.cpp b/core/unittest/test_dog_segment.cpp index e8e9a957b5..b50addd8b3 100644 --- a/core/unittest/test_dog_segment.cpp +++ b/core/unittest/test_dog_segment.cpp @@ -9,71 +9,21 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -// #include -// #include -// #include +#include #include #include -// #include "db/SnapshotVisitor.h" -// #include "db/Types.h" -// #include "db/snapshot/IterateHandler.h" -// #include "db/snapshot/Resources.h" -// #include "db/utils.h" // #include "knowhere/index/vector_index/helpers/IndexParameter.h" // #include "segment/SegmentReader.h" // #include "segment/SegmentWriter.h" -// #include "src/dog_segment/SegmentBase.h" +#include "dog_segment/SegmentBase.h" // #include "utils/Json.h" #include -#include -#include "dog_segment/SegmentBase.h" using std::cin; using std::cout; using std::endl; -// using SegmentVisitor = milvus::engine::SegmentVisitor; - -// namespace { -// milvus::Status -// CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { -// CreateCollectionContext context; -// context.lsn = lsn; -// auto collection_schema = std::make_shared(collection_name); -// context.collection = collection_schema; - -// int64_t collection_id = 0; -// int64_t field_id = 0; -// /* field uid */ -// auto uid_field = std::make_shared(milvus::engine::FIELD_UID, 0, milvus::engine::DataType::INT64, -// milvus::engine::snapshot::JEmpty, field_id); -// auto uid_field_element_blt = -// std::make_shared(collection_id, field_id, milvus::engine::ELEMENT_BLOOM_FILTER, -// milvus::engine::FieldElementType::FET_BLOOM_FILTER); -// auto uid_field_element_del = -// std::make_shared(collection_id, field_id, milvus::engine::ELEMENT_DELETED_DOCS, -// milvus::engine::FieldElementType::FET_DELETED_DOCS); - -// field_id++; -// /* field vector */ -// milvus::json vector_param = {{milvus::knowhere::meta::DIM, 4}}; -// auto vector_field = -// std::make_shared("vector", 0, milvus::engine::DataType::VECTOR_FLOAT, vector_param, field_id); -// auto vector_field_element_index = -// std::make_shared(collection_id, field_id, milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, -// milvus::engine::FieldElementType::FET_INDEX); -// /* another field*/ -// auto int_field = std::make_shared("int", 0, milvus::engine::DataType::INT32, -// milvus::engine::snapshot::JEmpty, field_id++); - -// context.fields_schema[uid_field] = {uid_field_element_blt, uid_field_element_del}; -// context.fields_schema[vector_field] = {vector_field_element_index}; -// context.fields_schema[int_field] = {}; - -// return db->CreateCollection(context); -// } -// } // namespace TEST(DogSegmentTest, TestABI) { using namespace milvus::engine; @@ -82,60 +32,6 @@ TEST(DogSegmentTest, TestABI) { assert(true); } -// TEST_F(DogSegmentTest, TestCreateAndSchema) { -// using namespace milvus::engine; -// using namespace milvus::dog_segment; -// // step1: create segment from current snapshot. - -// LSN_TYPE lsn = 0; -// auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; }; - -// // step 1.1: create collection -// std::string db_root = "/tmp/milvus_test/db/table"; -// std::string collection_name = "c1"; -// auto status = CreateCollection(db_, collection_name, next_lsn()); -// ASSERT_TRUE(status.ok()); - -// // step 1.2: get snapshot -// ScopedSnapshotT snapshot; -// status = Snapshots::GetInstance().GetSnapshot(snapshot, collection_name); -// ASSERT_TRUE(status.ok()); -// ASSERT_TRUE(snapshot); -// ASSERT_EQ(snapshot->GetName(), collection_name); - -// // step 1.3: get partition_id -// cout << endl; -// cout << endl; -// ID_TYPE partition_id = snapshot->GetResources().begin()->first; -// cout << partition_id; - -// // step 1.5 create schema from ids -// auto collection = snapshot->GetCollection(); - -// auto field_names = snapshot->GetFieldNames(); -// auto schema = std::make_shared(); -// for (const auto& field_name : field_names) { -// auto the_field = snapshot->GetField(field_name); -// auto param = the_field->GetParams(); -// auto type = the_field->GetFtype(); -// cout << field_name // -// << " " << (int)type // -// << " " << param // -// << endl; -// FieldMeta field(field_name, type); -// int dim = 1; -// if(field.is_vector()) { -// field.set_dim(dim); -// } -// schema->AddField(field); - -// } -// // step 1.6 create a segment from ids -// auto segment = CreateSegment(schema); -// std::vector primary_ids; -// } - - TEST(DogSegmentTest, MockTest) { using namespace milvus::dog_segment; @@ -145,7 +41,7 @@ TEST(DogSegmentTest, MockTest) { schema->AddField("age", DataType::INT32); std::vector raw_data; std::vector timestamps; - std::vector uids; + std::vector uids; int N = 10000; std::default_random_engine e(67); for(int i = 0; i < N; ++i) { @@ -163,108 +59,18 @@ TEST(DogSegmentTest, MockTest) { auto line_sizeof = (sizeof(int) + sizeof(float) * 16); assert(raw_data.size() == line_sizeof * N); - auto segment = CreateSegment(schema).release(); + + // auto index_meta = std::make_shared(schema); + auto segment = CreateSegment(schema, nullptr); + DogDataChunk data_chunk{raw_data.data(), (int)line_sizeof, N}; - segment->Insert(N, uids.data(), timestamps.data(), data_chunk); + auto offset = segment->PreInsert(N); + segment->Insert(offset, N, uids.data(), timestamps.data(), data_chunk); QueryResult query_result; - segment->Query(nullptr, 0, query_result); - delete segment; +// segment->Query(nullptr, 0, query_result); + segment->Close(); +// segment->BuildIndex(); int i = 0; i++; } -//TEST_F(DogSegmentTest, DogSegmentTest) { -// LSN_TYPE lsn = 0; -// auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; }; -// -// std::string db_root = "/tmp/milvus_test/db/table"; -// std::string c1 = "c1"; -// auto status = CreateCollection(db_, c1, next_lsn()); -// ASSERT_TRUE(status.ok()); -// -// ScopedSnapshotT snapshot; -// status = Snapshots::GetInstance().GetSnapshot(snapshot, c1); -// ASSERT_TRUE(status.ok()); -// ASSERT_TRUE(snapshot); -// ASSERT_EQ(snapshot->GetName(), c1); -// { -// SegmentFileContext sf_context; -// SFContextBuilder(sf_context, snapshot); -// } -// std::vector segfile_ctxs; -// SFContextsBuilder(segfile_ctxs, snapshot); -// -// std::cout << snapshot->ToString() << std::endl; -// -// ID_TYPE partition_id; -// { -// auto& partitions = snapshot->GetResources(); -// partition_id = partitions.begin()->first; -// } -// -// [&next_lsn, // -// &segfile_ctxs, // -// &partition_id, // -// &snapshot, // -// &db_root] { -// /* commit new segment */ -// OperationContext op_ctx; -// op_ctx.lsn = next_lsn(); -// op_ctx.prev_partition = snapshot->GetResource(partition_id); -// -// auto new_seg_op = std::make_shared(op_ctx, snapshot); -// SegmentPtr new_seg; -// auto status = new_seg_op->CommitNewSegment(new_seg); -// ASSERT_TRUE(status.ok()); -// -// /* commit new segment file */ -// for (auto& cctx : segfile_ctxs) { -// SegmentFilePtr seg_file; -// auto nsf_context = cctx; -// nsf_context.segment_id = new_seg->GetID(); -// nsf_context.partition_id = new_seg->GetPartitionId(); -// status = new_seg_op->CommitNewSegmentFile(nsf_context, seg_file); -// } -// -// /* build segment visitor */ -// auto ctx = new_seg_op->GetContext(); -// ASSERT_TRUE(ctx.new_segment); -// auto visitor = SegmentVisitor::Build(snapshot, ctx.new_segment, ctx.new_segment_files); -// ASSERT_TRUE(visitor); -// ASSERT_EQ(visitor->GetSegment(), new_seg); -// ASSERT_FALSE(visitor->GetSegment()->IsActive()); -// // std::cout << visitor->ToString() << std::endl; -// // std::cout << snapshot->ToString() << std::endl; -// -// /* write data */ -// milvus::segment::SegmentWriter segment_writer(db_root, visitor); -// -// // std::vector raw_uids = {123}; -// // std::vector raw_vectors = {1, 2, 3, 4}; -// // status = segment_writer.AddChunk("test", raw_vectors, raw_uids); -// // ASSERT_TRUE(status.ok()) -// // -// // status = segment_writer.Serialize(); -// // ASSERT_TRUE(status.ok()); -// -// /* read data */ -// // milvus::segment::SSSegmentReader segment_reader(db_root, visitor); -// // -// // status = segment_reader.Load(); -// // ASSERT_TRUE(status.ok()); -// // -// // milvus::segment::SegmentPtr segment_ptr; -// // status = segment_reader.GetSegment(segment_ptr); -// // ASSERT_TRUE(status.ok()); -// // -// // auto& out_uids = segment_ptr->vectors_ptr_->GetUids(); -// // ASSERT_EQ(raw_uids.size(), out_uids.size()); -// // ASSERT_EQ(raw_uids[0], out_uids[0]); -// // auto& out_vectors = segment_ptr->vectors_ptr_->GetData(); -// // ASSERT_EQ(raw_vectors.size(), out_vectors.size()); -// // ASSERT_EQ(raw_vectors[0], out_vectors[0]); -// }(); -// -// status = db_->DropCollection(c1); -// ASSERT_TRUE(status.ok()); -//}