Fix bug for aggregation

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2020-09-15 16:37:43 +08:00 committed by yefu.chen
parent 99ef484273
commit 71950d44a8
49 changed files with 398 additions and 10624 deletions

View File

@ -201,12 +201,3 @@ if ( NOT MILVUS_DB_PATH )
endif ()
set( GPU_ENABLE "false" )
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/dog_segment/
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/include
FILES_MATCHING PATTERN "*_c.h"
)
install(FILES ${CMAKE_BINARY_DIR}/src/dog_segment/libmilvus_dog_segment.so
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/lib)

View File

@ -40,10 +40,8 @@ PreDelete(CSegmentBase c_segment, long int size);
int
Search(CSegmentBase c_segment,
const char* query_json,
void* fake_query,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);
@ -52,9 +50,6 @@ Search(CSegmentBase c_segment,
int
Close(CSegmentBase c_segment);
int
BuildIndex(CSegmentBase c_segment);
bool
IsOpened(CSegmentBase c_segment);

View File

@ -1,59 +0,0 @@
#pragma once
#include "AckResponder.h"
#include "SegmentDefs.h"
#include "knowhere//index/vector_index/IndexIVF.h"
#include <memory>
namespace milvus::dog_segment {
struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
faiss::ConcurrentBitsetPtr bitmap_ptr;
std::shared_ptr<TmpBitmap> clone(int64_t capacity);
};
DeletedRecord() : lru_(std::make_shared<TmpBitmap>()) {
lru_->bitmap_ptr = std::make_shared<faiss::ConcurrentBitset>(0);
}
auto get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
void insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force || new_entry->bitmap_ptr->capacity() <= lru_->bitmap_ptr->capacity()) {
// DO NOTHING
return;
}
}
lru_ = std::move(new_entry);
}
public:
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
};
auto DeletedRecord::TmpBitmap::clone(int64_t capacity) -> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
res->bitmap_ptr = std::make_shared<faiss::ConcurrentBitset>(capacity);
auto u8size = this->bitmap_ptr->u8size();
memcpy(res->bitmap_ptr->mutable_data(), res->bitmap_ptr->data(), u8size);
return res;
}
}

View File

@ -164,9 +164,7 @@ class Schema {
const FieldMeta&
operator[](const std::string& field_name) const {
auto offset_iter = offsets_.find(field_name);
if (offset_iter == offsets_.end()) {
throw std::runtime_error("Cannot found field_name: " + field_name);
}
assert(offset_iter != offsets_.end());
auto offset = offset_iter->second;
return (*this)[offset];
}
@ -182,6 +180,5 @@ class Schema {
};
using SchemaPtr = std::shared_ptr<Schema>;
using idx_t = int64_t;
} // namespace milvus::dog_segment

View File

@ -17,6 +17,7 @@ TestABI() {
std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr remote_index_meta) {
if (remote_index_meta == nullptr) {
auto index_meta = std::make_shared<IndexMeta>(schema);
auto dim = schema->operator[]("fakevec").get_dim();
@ -64,20 +65,14 @@ SegmentNaive::PreDelete(int64_t size) {
}
auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp,
int64_t insert_barrier, bool force) -> std::shared_ptr<DeletedRecord::TmpBitmap> {
int64_t insert_barrier) -> std::shared_ptr<DeletedRecord::TmpBitmap> {
auto old = deleted_record_.get_lru_entry();
if(!force || old->bitmap_ptr->capacity() == insert_barrier) {
if (old->del_barrier == del_barrier) {
return old;
}
if (old->del_barrier == del_barrier) {
return old;
}
auto current = std::make_shared<DeletedRecord::TmpBitmap>(*old);
auto &vec = current->bitmap;
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
if (del_barrier < old->del_barrier) {
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
// get uid in delete logs
@ -89,7 +84,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto offset = iter->second;
if (record_.timestamps_[offset] < query_timestamp) {
assert(offset < insert_barrier);
assert(offset < vec.size());
the_offset = std::max(the_offset, offset);
}
}
@ -98,10 +93,11 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
continue;
}
// otherwise, clear the flag
bitmap->clear(the_offset);
vec[the_offset] = false;
}
return current;
} else {
vec.resize(insert_barrier);
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
// get uid in delete logs
auto uid = deleted_record_.uids_[del_index];
@ -114,11 +110,11 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
if (offset >= insert_barrier) {
continue;
}
if (offset >= insert_barrier) {
if (offset >= vec.size()) {
continue;
}
if (record_.timestamps_[offset] < query_timestamp) {
assert(offset < insert_barrier);
assert(offset < vec.size());
the_offset = std::max(the_offset, offset);
}
}
@ -129,7 +125,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
}
// otherwise, set the flag
bitmap->set(the_offset);
vec[the_offset] = true;
}
this->deleted_record_.insert_lru_entry(current);
}
@ -254,6 +250,15 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t *uids_r
// return Status::OK();
}
// TODO: remove mock
Status
SegmentNaive::QueryImpl(const query::QueryPtr &query, Timestamp timestamp, QueryResult &result) {
// assert(query);
throw std::runtime_error("unimplemnted");
}
template<typename RecordType>
int64_t get_barrier(const RecordType &record, Timestamp timestamp) {
auto &vec = record.timestamps_;
@ -270,70 +275,11 @@ int64_t get_barrier(const RecordType &record, Timestamp timestamp) {
return beg;
}
Status
SegmentNaive::QueryImpl(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) {
auto ins_barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true);
assert(bitmap_holder);
assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier);
auto field_offset = schema_->get_offset(query_info->field_name);
auto &field = schema_->operator[](query_info->field_name);
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto bitmap = bitmap_holder->bitmap_ptr;
auto topK = query_info->topK;
auto num_queries = query_info->num_queries;
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_[0]);
auto index_entry = index_meta_->lookup_by_field(query_info->field_name);
auto conf = index_entry.config;
conf[milvus::knowhere::meta::TOPK] = query_info->topK;
{
auto count = 0;
for(int i = 0; i < bitmap->capacity(); ++i) {
if(bitmap->test(i)) {
++count;
}
}
std::cout << "fuck " << count << std::endl;
}
auto indexing = std::static_pointer_cast<knowhere::VecIndex>(indexings_[index_entry.index_name]);
indexing->SetBlacklist(bitmap);
auto ds = knowhere::GenDataset(query_info->num_queries, dim, query_info->query_raw_data.data());
auto final = indexing->Query(ds, conf);
auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
auto total_num = num_queries * topK;
result.result_ids_.resize(total_num);
result.result_distances_.resize(total_num);
result.row_num_ = total_num;
result.num_queries_ = num_queries;
result.topK_ = topK;
std::copy_n(ids, total_num, result.result_ids_.data());
std::copy_n(distances, total_num, result.result_distances_.data());
for(auto& id: result.result_ids_) {
id = record_.uids_[id];
}
return Status::OK();
}
Status
SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) {
// TODO: enable delete
// TODO: enable index
// TODO: remove mock
if (query_info == nullptr) {
query_info = std::make_shared<query::Query>();
query_info->field_name = "fakevec";
@ -349,22 +295,27 @@ SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult
}
}
if(index_ready_) {
return QueryImpl(query_info, timestamp, result);
}
auto ins_barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
assert(bitmap_holder);
auto &field = schema_->operator[](query_info->field_name);
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto bitmap = bitmap_holder->bitmap_ptr;
auto topK = query_info->topK;
auto num_queries = query_info->num_queries;
auto barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, barrier);
if (!bitmap_holder) {
throw std::runtime_error("fuck");
}
auto bitmap = &bitmap_holder->bitmap;
if (topK > barrier) {
topK = barrier;
}
auto get_L2_distance = [dim](const float *a, const float *b) {
float L2_distance = 0;
for (auto i = 0; i < dim; ++i) {
@ -374,12 +325,11 @@ SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult
return L2_distance;
};
// TODO: optimize
std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
// TODO: optimize
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_[0]);
for (int64_t i = 0; i < ins_barrier; ++i) {
if (i < bitmap->capacity() && bitmap->test(i)) {
for (int64_t i = 0; i < barrier; ++i) {
if (i < bitmap->size() && bitmap->at(i)) {
continue;
}
auto element = vec_ptr->get_element(i);
@ -456,16 +406,17 @@ knowhere::IndexPtr SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry &entry
std::vector<knowhere::DatasetPtr> datasets;
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
auto entities_chunk = entities->get_chunk(chunk_id).data();
auto &uids_chunk = uids.get_chunk(chunk_id);
auto &entities_chunk = entities->get_chunk(chunk_id);
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
datasets.push_back(knowhere::GenDatasetWithIds(count, dim, entities_chunk.data(), uids_chunk.data()));
}
for (auto &ds: datasets) {
indexing->Train(ds, entry.config);
}
for (auto &ds: datasets) {
indexing->AddWithoutIds(ds, entry.config);
indexing->Add(ds, entry.config);
}
return indexing;
}
@ -484,7 +435,6 @@ SegmentNaive::BuildIndex() {
throw std::runtime_error("unimplemented");
}
}
index_ready_ = true;
return Status::OK();
}

View File

@ -1,5 +1,4 @@
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_vector.h>
@ -13,19 +12,18 @@
// #include "knowhere/index/structured_index/StructuredIndex.h"
#include "query/GeneralQuery.h"
#include "utils/Status.h"
#include "dog_segment/DeletedRecord.h"
using idx_t = int64_t;
namespace milvus::dog_segment {
struct ColumnBasedDataChunk {
std::vector<std::vector<float>> entity_vecs;
static ColumnBasedDataChunk
from(const DogDataChunk &source, const Schema &schema) {
from(const DogDataChunk& source, const Schema& schema) {
ColumnBasedDataChunk dest;
auto count = source.count;
auto raw_data = reinterpret_cast<const char *>(source.raw_data);
auto raw_data = reinterpret_cast<const char*>(source.raw_data);
auto align = source.sizeof_per_row;
for (auto &field : schema) {
for (auto& field : schema) {
auto len = field.get_sizeof();
assert(len % sizeof(float) == 0);
std::vector<float> new_col(len * count / sizeof(float));
@ -41,7 +39,7 @@ struct ColumnBasedDataChunk {
};
class SegmentNaive : public SegmentBase {
public:
public:
virtual ~SegmentNaive() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
@ -51,18 +49,17 @@ public:
// TODO: originally, id should be put into data_chunk
// TODO: Is it ok to put them the other side?
Status
Insert(int64_t reserverd_offset, int64_t size, const int64_t *primary_keys, const Timestamp *timestamps,
const DogDataChunk &values) override;
Insert(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) override;
int64_t PreDelete(int64_t size) override;
// TODO: add id into delete log, possibly bitmap
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t *primary_keys, const Timestamp *timestamps) override;
Delete(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) override;
// query contains metadata of
Status
Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult &results) override;
Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult& results) override;
// stop receive insert requests
// will move data to immutable vector or something
@ -86,7 +83,7 @@ public:
}
Status
LoadRawData(std::string_view field_name, const char *blob, int64_t blob_size) override {
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override {
// TODO: NO-OP
return Status::OK();
}
@ -96,12 +93,10 @@ public:
get_row_count() const override {
return record_.ack_responder_.GetAck();
}
SegmentState
get_state() const override {
return state_.load(std::memory_order_relaxed);
}
ssize_t
get_deleted_count() const override {
return 0;
@ -110,17 +105,15 @@ public:
public:
friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta);
explicit SegmentNaive(SchemaPtr schema, IndexMetaPtr index_meta)
: schema_(schema), index_meta_(index_meta), record_(*schema) {
}
private:
private:
struct MutableRecord {
ConcurrentVector<uint64_t> uids_;
tbb::concurrent_vector<Timestamp> timestamps_;
std::vector<tbb::concurrent_vector<float>> entity_vecs_;
MutableRecord(int entity_size) : entity_vecs_(entity_size) {
}
};
@ -131,34 +124,58 @@ private:
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
std::vector<std::shared_ptr<VectorBase>> entity_vec_;
Record(const Schema &schema);
Record(const Schema& schema);
template<typename Type>
auto get_vec_entity(int offset) {
return std::static_pointer_cast<ConcurrentVector<Type>>(entity_vec_[offset]);
}
};
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
struct DeletedRecord {
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
std::vector<bool> bitmap;
};
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
DeletedRecord(): lru_(std::make_shared<TmpBitmap>()) {}
auto get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
void insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry) {
std::lock_guard lck(shared_mutex_);
if(new_entry->del_barrier <= lru_->del_barrier) {
// DO NOTHING
return;
}
lru_ = std::move(new_entry);
}
};
std::shared_ptr<DeletedRecord::TmpBitmap> get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier);
Status
QueryImpl(query::QueryPtr query, Timestamp timestamp, QueryResult &results);
QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results);
template<typename Type>
knowhere::IndexPtr BuildVecIndexImpl(const IndexMeta::Entry &entry);
knowhere::IndexPtr BuildVecIndexImpl(const IndexMeta::Entry& entry);
private:
private:
SchemaPtr schema_;
std::atomic<SegmentState> state_ = SegmentState::Open;
Record record_;
DeletedRecord deleted_record_;
std::atomic<bool> index_ready_ = false;
IndexMetaPtr index_meta_;
std::unordered_map<std::string, knowhere::IndexPtr> indexings_; // index_name => indexing
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
};
} // namespace milvus::dog_segment

View File

@ -91,29 +91,14 @@ PreDelete(CSegmentBase c_segment, long int size) {
int
Search(CSegmentBase c_segment,
const char* query_json,
void* fake_query,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
milvus::dog_segment::QueryResult query_result;
// parse query param json
auto query_param_json_string = std::string(query_json);
auto query_param_json = nlohmann::json::parse(query_param_json_string);
// construct QueryPtr
auto query_ptr = std::make_shared<milvus::query::Query>();
query_ptr->num_queries = query_param_json["num_queries"];
query_ptr->topK = query_param_json["topK"];
query_ptr->field_name = query_param_json["field_name"];
query_ptr->query_raw_data.resize(num_of_query_raw_data);
memcpy(query_ptr->query_raw_data.data(), query_raw_data, num_of_query_raw_data * sizeof(float));
auto res = segment->Query(query_ptr, timestamp, query_result);
auto res = segment->Query(nullptr, timestamp, query_result);
// result_ids and result_distances have been allocated memory in goLang,
// so we don't need to malloc here.

View File

@ -40,10 +40,8 @@ PreDelete(CSegmentBase c_segment, long int size);
int
Search(CSegmentBase c_segment,
const char* query_json,
void* fake_query,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);

View File

@ -108,7 +108,7 @@ class VecIndex : public Index {
size_t
BlacklistSize() {
if (bitset_) {
return bitset_->u8size() * sizeof(uint8_t);
return bitset_->size() * sizeof(uint8_t);
} else {
return 0;
}

View File

@ -197,7 +197,7 @@ ConcurrentBitset::capacity() {
}
size_t
ConcurrentBitset::u8size() {
ConcurrentBitset::size() {
return ((capacity_ + 8 - 1) >> 3);
}

View File

@ -63,15 +63,15 @@ class ConcurrentBitset {
size_t
capacity();
size_t
size();
const uint8_t*
data();
uint8_t*
mutable_data();
size_t
u8size();
private:
size_t capacity_;
std::vector<std::atomic<uint8_t>> bitset_;

View File

@ -7,182 +7,178 @@
#include "dog_segment/segment_c.h"
TEST(CApiTest, CollectionTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
DeleteCollection(collection);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
DeleteCollection(collection);
}
TEST(CApiTest, PartitonTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
DeleteCollection(collection);
DeletePartition(partition);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
DeleteCollection(collection);
DeletePartition(partition);
}
TEST(CApiTest, SegmentTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, InsertTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for (int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for(int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for(auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
assert(res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, DeleteTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
auto offset = PreDelete(segment, 3);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, SearchTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for (int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for(int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for(auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto offset = PreInsert(segment, N);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(ins_res == 0);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(ins_res == 0);
long result_ids[10];
float result_distances[10];
long result_ids[10];
float result_distances[10];
auto sea_res = Search(segment, nullptr, 1, result_ids, result_distances);
assert(sea_res == 0);
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
std::vector<float> query_raw_data(16);
for (int i = 0; i < 16; i++) {
query_raw_data[i] = e() % 2000 * 0.001 - 1.0;
}
auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), 16, result_ids, result_distances);
assert(sea_res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, IsOpenedTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto is_opened = IsOpened(segment);
assert(is_opened);
auto is_opened = IsOpened(segment);
assert(is_opened);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, CloseTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto status = Close(segment);
assert(status == 0);
auto status = Close(segment);
assert(status == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
@ -194,17 +190,17 @@ auto generate_data(int N) {
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for (int i = 0; i < N; ++i) {
for(int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
for(auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
int age = ei() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
return std::make_tuple(raw_data, timestamps, uids);
}
@ -221,10 +217,10 @@ TEST(CApiTest, TestQuery) {
int N = 1000 * 1000;
auto[raw_data, timestamps, uids] = generate_data(N);
auto [raw_data, timestamps, uids] = generate_data(N);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
auto row_count = GetRowCount(segment);
@ -232,18 +228,15 @@ TEST(CApiTest, TestQuery) {
std::vector<long> result_ids(10);
std::vector<float> result_distances(10);
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
auto sea_res = Search(segment, query_json.data(), 1, (float *) raw_data.data(), 16, result_ids.data(),
result_distances.data());
auto sea_res = Search(segment, nullptr, 1, result_ids.data(), result_distances.data());
ASSERT_EQ(sea_res, 0);
ASSERT_EQ(result_ids[0], 10 * N);
ASSERT_EQ(result_distances[0], 0);
auto N_del = N / 2;
std::vector<uint64_t> del_ts(N_del, 100);
auto pre_off = PreDelete(segment, N_del);
Delete(segment, pre_off, N_del, uids.data(), del_ts.data());
std::vector<uint64_t> del_ts(N/2, 100);
auto pre_off = PreDelete(segment, N / 2);
Delete(segment, pre_off, N / 2, uids.data(), del_ts.data());
Close(segment);
BuildIndex(segment);
@ -251,37 +244,25 @@ TEST(CApiTest, TestQuery) {
std::vector<long> result_ids2(10);
std::vector<float> result_distances2(10);
sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
sea_res = Search(segment, query_json.data(), 104, (float *) raw_data.data(), 16, result_ids2.data(),
result_distances2.data());
// sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
std::cout << "case 1" << std::endl;
for (int i = 0; i < 10; ++i) {
std::cout << result_ids[i] << "->" << result_distances[i] << std::endl;
}
std::cout << "case 2" << std::endl;
for (int i = 0; i < 10; ++i) {
std::cout << result_ids2[i] << "->" << result_distances2[i] << std::endl;
}
for (auto x: result_ids2) {
ASSERT_GE(x, 10 * N + N_del);
for(auto x: result_ids2) {
ASSERT_GE(x, 10 * N + N / 2);
ASSERT_LT(x, 10 * N + N);
}
// auto iter = 0;
// for(int i = 0; i < result_ids.size(); ++i) {
// auto uid = result_ids[i];
// auto dis = result_distances[i];
// if(uid >= 10 * N + N_del) {
// auto uid2 = result_ids2[iter];
// auto dis2 = result_distances2[iter];
// ASSERT_EQ(uid, uid2);
// ASSERT_EQ(dis, dis2);
// ++iter;
// }
// }
auto iter = 0;
for(int i = 0; i < result_ids.size(); ++i) {
auto uid = result_ids[i];
auto dis = result_distances[i];
if(uid >= 10 * N + N / 2) {
auto uid2 = result_ids2[iter];
auto dis2 = result_distances2[iter];
ASSERT_EQ(uid, uid2);
ASSERT_EQ(dis, dis2);
++iter;
}
}
DeleteCollection(collection);
@ -290,28 +271,28 @@ TEST(CApiTest, TestQuery) {
}
TEST(CApiTest, GetDeletedCountTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
auto offset = PreDelete(segment, 3);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
// TODO: assert(deleted_count == len(delete_primary_keys))
auto deleted_count = GetDeletedCount(segment);
assert(deleted_count == 0);
// TODO: assert(deleted_count == len(delete_primary_keys))
auto deleted_count = GetDeletedCount(segment);
assert(deleted_count == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
@ -325,10 +306,10 @@ TEST(CApiTest, GetRowCountTest) {
int N = 10000;
auto[raw_data, timestamps, uids] = generate_data(N);
auto [raw_data, timestamps, uids] = generate_data(N);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
auto row_count = GetRowCount(segment);

View File

@ -26,30 +26,29 @@ using std::vector;
using namespace milvus;
namespace {
template<int DIM>
auto generate_data(int N) {
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[DIM];
for (auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
template<int DIM>
auto generate_data(int N) {
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[DIM];
for (auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
// int age = ei() % 100;
// raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
return std::make_tuple(raw_data, timestamps, uids);
}
return std::make_tuple(raw_data, timestamps, uids);
}
}
TEST(TestIndex, Naive) {
constexpr int N = 100000;
@ -70,59 +69,25 @@ TEST(TestIndex, Naive) {
{milvus::knowhere::meta::DEVICEID, 0},
};
// auto ds = knowhere::GenDataset(N, DIM, raw_data.data());
// auto ds2 = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data() + sizeof(float[DIM]) * N / 2, uids.data() + N / 2);
auto ds = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data(), uids.data());
auto ds2 = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data() + sizeof(float[DIM]) * N / 2, uids.data() + N / 2);
// NOTE: you must train first and then add
// index->Train(ds, conf);
// index->Train(ds2, conf);
// index->AddWithoutIds(ds, conf);
// index->Add(ds2, conf);
index->Train(ds, conf);
index->Train(ds2, conf);
index->Add(ds, conf);
index->Add(ds2, conf);
std::vector<knowhere::DatasetPtr> datasets;
std::vector<std::vector<float>> ftrashs;
for (int beg = 0; beg < N; beg += N) {
auto end = beg + N;
if (end > N) {
end = N;
}
std::vector<float> ft(raw_data.data() + DIM * beg, raw_data.data() + DIM * end);
auto ds = knowhere::GenDataset(end - beg, DIM, ft.data());
datasets.push_back(ds);
ftrashs.push_back(std::move(ft));
// // NOTE: you must train first and then add
// index->Train(ds, conf);
// index->Add(ds, conf);
}
for (auto &ds: datasets) {
index->Train(ds, conf);
}
for (auto &ds: datasets) {
index->AddWithoutIds(ds, conf);
}
auto bitmap = std::make_shared<faiss::ConcurrentBitset>(N);
// exclude the first
for (int i = 0; i < N / 2; ++i) {
bitmap->set(i);
}
index->SetBlacklist(bitmap);
auto query_ds = knowhere::GenDataset(1, DIM, raw_data.data());
auto final = index->Query(query_ds, conf);
auto ids = final->Get<idx_t *>(knowhere::meta::IDS);
auto distances = final->Get<float *>(knowhere::meta::DISTANCE);
for (int i = 0; i < TOPK; ++i) {
if (ids[i] < N / 2) {
cout << "WRONG: ";
}
auto mmm = final->data();
cout << endl;
for(auto [k, v]: mmm) {
cout << k << endl;
}
auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
for(int i = 0; i < TOPK; ++i) {
cout << ids[i] << "->" << distances[i] << endl;
}
int i = 1 + 1;
int i = 1+1;
}

View File

@ -25,15 +25,13 @@ add_subdirectory( db ) # target milvus_engine
add_subdirectory( log )
add_subdirectory( server )
add_subdirectory( message_client )
add_subdirectory( meta )
set(link_lib
milvus_engine
config
query
query
utils
log
meta
)

View File

@ -85,11 +85,6 @@ ConfigMgr::ConfigMgr() {
"localhost", nullptr, nullptr)},
{"pulsar.port", CreateIntegerConfig("pulsar.port", false, 0, 65535, &config.pulsar.port.value,
6650, nullptr, nullptr)},
/* master */
{"master.address", CreateStringConfig("master.address", false, &config.master.address.value,
"localhost", nullptr, nullptr)},
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
6000, nullptr, nullptr)},
/* log */

View File

@ -76,11 +76,6 @@ struct ServerConfig {
Integer port{6650};
}pulsar;
struct Master{
String address{"localhost"};
Integer port{6000};
}master;
struct Engine {
Integer build_index_threshold{4096};

View File

@ -1,70 +0,0 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: etcd.proto
#include "etcd.pb.h"
#include "etcd.grpc.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/client_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace etcdserverpb {
static const char* Watch_method_names[] = {
"/etcdserverpb.Watch/Watch",
};
std::unique_ptr< Watch::Stub> Watch::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
(void)options;
std::unique_ptr< Watch::Stub> stub(new Watch::Stub(channel));
return stub;
}
Watch::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
: channel_(channel), rpcmethod_Watch_(Watch_method_names[0], ::grpc::internal::RpcMethod::BIDI_STREAMING, channel)
{}
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::WatchRaw(::grpc::ClientContext* context) {
return ::grpc_impl::internal::ClientReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), rpcmethod_Watch_, context);
}
void Watch::Stub::experimental_async::Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) {
::grpc_impl::internal::ClientCallbackReaderWriterFactory< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>::Create(stub_->channel_.get(), stub_->rpcmethod_Watch_, context, reactor);
}
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, true, tag);
}
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, false, nullptr);
}
Watch::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Watch_method_names[0],
::grpc::internal::RpcMethod::BIDI_STREAMING,
new ::grpc::internal::BidiStreamingHandler< Watch::Service, ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
std::mem_fn(&Watch::Service::Watch), this)));
}
Watch::Service::~Service() {
}
::grpc::Status Watch::Service::Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream) {
(void) context;
(void) stream;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
} // namespace etcdserverpb

View File

@ -1,235 +0,0 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: etcd.proto
#ifndef GRPC_etcd_2eproto__INCLUDED
#define GRPC_etcd_2eproto__INCLUDED
#include "etcd.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/stub_options.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace grpc_impl {
class CompletionQueue;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc_impl
namespace grpc {
namespace experimental {
template <typename RequestT, typename ResponseT>
class MessageAllocator;
} // namespace experimental
} // namespace grpc
namespace etcdserverpb {
class Watch final {
public:
static constexpr char const* service_full_name() {
return "etcdserverpb.Watch";
}
class StubInterface {
public:
virtual ~StubInterface() {}
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
return std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
}
class experimental_async_interface {
public:
virtual ~experimental_async_interface() {}
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
virtual void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) = 0;
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0;
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
return std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
}
class experimental_async final :
public StubInterface::experimental_async_interface {
public:
void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
Stub* stub() { return stub_; }
Stub* stub_;
};
class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
class experimental_async async_stub_{this};
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_Watch_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
class Service : public ::grpc::Service {
public:
Service();
virtual ~Service();
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
virtual ::grpc::Status Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream);
};
template <class BaseClass>
class WithAsyncMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithAsyncMethod_Watch() {
::grpc::Service::MarkMethodAsync(0);
}
~WithAsyncMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
}
};
typedef WithAsyncMethod_Watch<Service > AsyncService;
template <class BaseClass>
class ExperimentalWithCallbackMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithCallbackMethod_Watch() {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc_impl::internal::CallbackBidiHandler< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
[this] { return this->Watch(); }));
}
~ExperimentalWithCallbackMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual ::grpc::experimental::ServerBidiReactor< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch() {
return new ::grpc_impl::internal::UnimplementedBidiReactor<
::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>;}
};
typedef ExperimentalWithCallbackMethod_Watch<Service > ExperimentalCallbackService;
template <class BaseClass>
class WithGenericMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithGenericMethod_Watch() {
::grpc::Service::MarkMethodGeneric(0);
}
~WithGenericMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
};
template <class BaseClass>
class WithRawMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithRawMethod_Watch() {
::grpc::Service::MarkMethodRaw(0);
}
~WithRawMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithRawCallbackMethod_Watch() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
new ::grpc_impl::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this] { return this->Watch(); }));
}
~ExperimentalWithRawCallbackMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* Watch() {
return new ::grpc_impl::internal::UnimplementedBidiReactor<
::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
};
typedef Service StreamedUnaryService;
typedef Service SplitStreamedService;
typedef Service StreamedService;
};
} // namespace etcdserverpb
#endif // GRPC_etcd_2eproto__INCLUDED

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,83 +0,0 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: master.proto
#include "master.pb.h"
#include "master.grpc.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/client_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace masterpb {
static const char* Master_method_names[] = {
"/masterpb.Master/CreateCollection",
};
std::unique_ptr< Master::Stub> Master::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
(void)options;
std::unique_ptr< Master::Stub> stub(new Master::Stub(channel));
return stub;
}
Master::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
: channel_(channel), rpcmethod_CreateCollection_(Master_method_names[0], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
{}
::grpc::Status Master::Stub::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_CreateCollection_, context, request, response);
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, true);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, false);
}
Master::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Master_method_names[0],
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< Master::Service, ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
std::mem_fn(&Master::Service::CreateCollection), this)));
}
Master::Service::~Service() {
}
::grpc::Status Master::Service::CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response) {
(void) context;
(void) request;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
} // namespace masterpb

View File

@ -1,252 +0,0 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: master.proto
#ifndef GRPC_master_2eproto__INCLUDED
#define GRPC_master_2eproto__INCLUDED
#include "master.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/stub_options.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace grpc_impl {
class CompletionQueue;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc_impl
namespace grpc {
namespace experimental {
template <typename RequestT, typename ResponseT>
class MessageAllocator;
} // namespace experimental
} // namespace grpc
namespace masterpb {
class Master final {
public:
static constexpr char const* service_full_name() {
return "masterpb.Master";
}
class StubInterface {
public:
virtual ~StubInterface() {}
virtual ::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) = 0;
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
}
class experimental_async_interface {
public:
virtual ~experimental_async_interface() {}
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) override;
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
}
class experimental_async final :
public StubInterface::experimental_async_interface {
public:
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
Stub* stub() { return stub_; }
Stub* stub_;
};
class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
class experimental_async async_stub_{this};
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_CreateCollection_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
class Service : public ::grpc::Service {
public:
Service();
virtual ~Service();
virtual ::grpc::Status CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response);
};
template <class BaseClass>
class WithAsyncMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithAsyncMethod_CreateCollection() {
::grpc::Service::MarkMethodAsync(0);
}
~WithAsyncMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestCreateCollection(::grpc::ServerContext* context, ::milvus::grpc::Mapping* request, ::grpc::ServerAsyncResponseWriter< ::milvus::grpc::Status>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
typedef WithAsyncMethod_CreateCollection<Service > AsyncService;
template <class BaseClass>
class ExperimentalWithCallbackMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithCallbackMethod_CreateCollection() {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
[this](::grpc::ServerContext* context,
const ::milvus::grpc::Mapping* request,
::milvus::grpc::Status* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->CreateCollection(context, request, response, controller);
}));
}
void SetMessageAllocatorFor_CreateCollection(
::grpc::experimental::MessageAllocator< ::milvus::grpc::Mapping, ::milvus::grpc::Status>* allocator) {
static_cast<::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>*>(
::grpc::Service::experimental().GetHandler(0))
->SetMessageAllocator(allocator);
}
~ExperimentalWithCallbackMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
typedef ExperimentalWithCallbackMethod_CreateCollection<Service > ExperimentalCallbackService;
template <class BaseClass>
class WithGenericMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithGenericMethod_CreateCollection() {
::grpc::Service::MarkMethodGeneric(0);
}
~WithGenericMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
};
template <class BaseClass>
class WithRawMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithRawMethod_CreateCollection() {
::grpc::Service::MarkMethodRaw(0);
}
~WithRawMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestCreateCollection(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncResponseWriter< ::grpc::ByteBuffer>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithRawCallbackMethod_CreateCollection() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this](::grpc::ServerContext* context,
const ::grpc::ByteBuffer* request,
::grpc::ByteBuffer* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
this->CreateCollection(context, request, response, controller);
}));
}
~ExperimentalWithRawCallbackMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::grpc::ByteBuffer* /*request*/, ::grpc::ByteBuffer* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
template <class BaseClass>
class WithStreamedUnaryMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithStreamedUnaryMethod_CreateCollection() {
::grpc::Service::MarkMethodStreamed(0,
new ::grpc::internal::StreamedUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(std::bind(&WithStreamedUnaryMethod_CreateCollection<BaseClass>::StreamedCreateCollection, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable regular version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
// replace default version of method with streamed unary
virtual ::grpc::Status StreamedCreateCollection(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::milvus::grpc::Mapping,::milvus::grpc::Status>* server_unary_streamer) = 0;
};
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedUnaryService;
typedef Service SplitStreamedService;
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedService;
};
} // namespace masterpb
#endif // GRPC_master_2eproto__INCLUDED

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,164 +0,0 @@
syntax = "proto3";
package etcdserverpb;
service Watch {
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
}
}
message WatchRequest {
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}
message WatchCreateRequest {
// key is the key to register for watching.
bytes key = 1;
// range_end is the end of the range [key, range_end) to watch. If range_end is not given,
// only the key argument is watched. If range_end is equal to '\0', all keys greater than
// or equal to the key argument are watched.
// If the range_end is one bit larger than the given key,
// then all keys with the prefix (the given key) will be watched.
bytes range_end = 2;
// start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
int64 start_revision = 3;
// progress_notify is set so that the etcd server will periodically send a WatchResponse with
// no events to the new watcher if there are no recent events. It is useful when clients
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
enum FilterType {
// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
}
// filters filter the events at server side before it sends back to the watcher.
repeated FilterType filters = 5;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
// fragment enables splitting large revisions into multiple watch responses.
bool fragment = 8;
}
message WatchCancelRequest {
// watch_id is the watcher id to cancel so that no more events are transmitted.
int64 watch_id = 1;
}
// Requests the a watch stream progress status be sent in the watch response stream as soon as
// possible.
message WatchProgressRequest {
}
message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 2;
// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive events for
// the created watcher from the same stream.
// All events sent to the created watcher will attach with the same watch_id.
bool created = 3;
// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 4;
// compact_revision is set to the minimum index if a watcher tries to watch
// at a compacted index.
//
// This happens when creating a watcher at a compacted revision or the watcher cannot
// catch up with the progress of the key-value store.
//
// The client should treat the watcher as canceled and should not try to create any
// watcher with the same start_revision again.
int64 compact_revision = 5;
// cancel_reason indicates the reason for canceling the watcher.
string cancel_reason = 6;
// framgment is true if large watch response was split over multiple responses.
bool fragment = 7;
repeated Event events = 11;
}
message ResponseHeader {
// cluster_id is the ID of the cluster which sent the response.
uint64 cluster_id = 1;
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
}
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// create_revision is the revision of last creation on this key.
int64 create_revision = 2;
// mod_revision is the revision of last modification on this key.
int64 mod_revision = 3;
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
int64 version = 4;
// value is the value held by the key, in bytes.
bytes value = 5;
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
int64 lease = 6;
}
message Event {
enum EventType {
PUT = 0;
DELETE = 1;
}
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
EventType type = 1;
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -3,6 +3,8 @@
#include "PartitionPolicy.h"
#include "utils/CommonUtil.h"
#include <omp.h>
#include <numeric>
#include <algorithm>
namespace milvus::message_client {
@ -48,64 +50,81 @@ int64_t GetQueryNodeNum() {
return 1;
}
milvus::grpc::QueryResult Aggregation(std::vector<std::shared_ptr<grpc::QueryResult>> &results) {
//TODO: QueryNode has only one
int64_t length = results.size();
Status
Aggregation(std::vector<std::shared_ptr<grpc::QueryResult>> results, milvus::grpc::QueryResult* result) {
if (results.empty()) {
return Status(DB_ERROR, "The result is null!");
}
std::vector<float> all_scores;
std::vector<float> all_distance;
std::vector<int64_t> all_entities_ids;
std::vector<bool> all_valid_row;
std::vector<grpc::RowData> all_row_data;
std::vector<grpc::KeyValuePair> all_kv_pairs;
std::vector<int> index(length * results[0]->distances_size());
for (int n = 0; n < length * results[0]->distances_size(); ++n) {
index[n] = n;
}
grpc::Status status;
int row_num = 0;
for (int i = 0; i < length; i++) {
for (int j = 0; j < results[i]->distances_size(); j++) {
// all_scores.push_back(results[i]->scores()[j]);
all_distance.push_back(results[i]->distances()[j]);
// all_kv_pairs.push_back(results[i]->extra_params()[j]);
}
}
for (int k = 0; k < all_distance.size() - 1; ++k) {
for (int l = k + 1; l < all_distance.size(); ++l) {
if (all_distance[l] > all_distance[k]) {
float distance_temp = all_distance[k];
all_distance[k] = all_distance[l];
all_distance[l] = distance_temp;
int index_temp = index[k];
index[k] = index[l];
index[l] = index_temp;
for (auto & result_per_node : results) {
if (result_per_node->status().error_code() != grpc::ErrorCode::SUCCESS){
// if (one_node_res->status().error_code() != grpc::ErrorCode::SUCCESS ||
// one_node_res->entities().status().error_code() != grpc::ErrorCode::SUCCESS) {
return Status(DB_ERROR, "QueryNode return wrong status!");
}
}
for (int j = 0; j < result_per_node->distances_size(); j++) {
all_scores.push_back(result_per_node->scores()[j]);
all_distance.push_back(result_per_node->distances()[j]);
// all_kv_pairs.push_back(result_per_node->extra_params()[j]);
}
for (int k = 0; k < result_per_node->entities().ids_size(); ++k) {
all_entities_ids.push_back(result_per_node->entities().ids(k));
// all_valid_row.push_back(result_per_node->entities().valid_row(k));
// all_row_data.push_back(result_per_node->entities().rows_data(k));
}
if (result_per_node->row_num() > row_num){
row_num = result_per_node->row_num();
}
status = result_per_node->status();
}
grpc::QueryResult result;
std::vector<int> index(all_distance.size());
result.mutable_status()->CopyFrom(results[0]->status());
result.mutable_entities()->CopyFrom(results[0]->entities());
result.set_row_num(results[0]->row_num());
iota(index.begin(), index.end(), 0);
for (int m = 0; m < results[0]->distances_size(); ++m) {
// result.add_scores(all_scores[index[m]]);
result.add_distances(all_distance[m]);
// result.add_extra_params();
// result.mutable_extra_params(m)->CopyFrom(all_kv_pairs[index[m]]);
std::stable_sort(index.begin(), index.end(),
[&all_distance](size_t i1, size_t i2) {return all_distance[i1] > all_distance[i2];});
grpc::Entities result_entities;
for (int m = 0; m < result->row_num(); ++m) {
result->add_scores(all_scores[index[m]]);
result->add_distances(all_distance[index[m]]);
// result->add_extra_params();
// result->mutable_extra_params(m)->CopyFrom(all_kv_pairs[index[m]]);
result_entities.add_ids(all_entities_ids[index[m]]);
// result_entities.add_valid_row(all_valid_row[index[m]]);
// result_entities.add_rows_data();
// result_entities.mutable_rows_data(m)->CopyFrom(all_row_data[index[m]]);
}
// result.set_query_id(results[0]->query_id());
// result.set_client_id(results[0]->client_id());
result_entities.mutable_status()->CopyFrom(status);
return result;
result->set_row_num(row_num);
result->mutable_entities()->CopyFrom(result_entities);
result->set_query_id(results[0]->query_id());
// result->set_client_id(results[0]->client_id());
return Status::OK();
}
Status MsgClientV2::GetQueryResult(int64_t query_id, milvus::grpc::QueryResult* result) {
int64_t query_node_num = GetQueryNodeNum();
auto t1 = std::chrono::high_resolution_clock::now();
while (true) {
auto received_result = total_results[query_id];
if (received_result.size() == query_node_num) {
@ -124,8 +143,9 @@ Status MsgClientV2::GetQueryResult(int64_t query_id, milvus::grpc::QueryResult*
return Status(DB_ERROR, "can't parse message which from pulsar!");
}
}
*result = Aggregation(total_results[query_id]);
return Status::OK();
auto status = Aggregation(total_results[query_id], result);
return status;
}
Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp) {
@ -155,7 +175,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uin
}
}
for (auto &stat : stats) {
if (stat != pulsar::ResultOk) {
if (stat == pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
}
}
@ -181,7 +201,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
}
}
for (auto &stat : stats) {
if (stat != pulsar::ResultOk) {
if (stat == pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
}
}
@ -199,18 +219,19 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u
search_msg.set_timestamp(timestamp);
search_msg.set_dsl(request.dsl());
//TODO: get data type from master
milvus::grpc::VectorRowRecord vector_row_recode;
std::vector<float> vectors_records;
std::string binary_data;
for (int i = 0; i < request.vector_param_size(); ++i) {
search_msg.add_json(request.vector_param(i).json());
for (int j = 0; j < request.vector_param(i).row_record().records_size(); ++j) {
for (int k = 0; k < request.vector_param(i).row_record().records(j).float_data_size(); ++k) {
vector_row_recode.add_float_data(request.vector_param(i).row_record().records(j).float_data(k));
}
vector_row_recode.set_binary_data(request.vector_param(i).row_record().records(j).binary_data());
binary_data.append(request.vector_param(i).row_record().records(j).binary_data());
}
}
vector_row_recode.set_binary_data(binary_data);
search_msg.mutable_records()->CopyFrom(vector_row_recode);
@ -222,80 +243,12 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u
search_msg.mutable_extra_params(l)->CopyFrom(request.extra_params(l));
}
std::cout << search_msg.collection_name() << std::endl;
auto result = search_producer_->send(search_msg);
if (result != pulsar::Result::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(result));
}
return Status::OK();
// milvus::grpc::QueryResult fake_message;
// milvus::grpc::QueryResult fake_message2;
//
// milvus::grpc::Status fake_status;
// fake_status.set_error_code(milvus::grpc::ErrorCode::SUCCESS);
// std::string aaa = "hahaha";
// fake_status.set_reason(aaa);
//
// milvus::grpc::RowData fake_row_data;
// fake_row_data.set_blob("fake_row_data");
//
// milvus::grpc::Entities fake_entities;
//// fake_entities.set_allocated_status(&fake_status);
// fake_entities.mutable_status()->CopyFrom(fake_status);
// for (int i = 0; i < 10; i++){
// fake_entities.add_ids(i);
// fake_entities.add_valid_row(true);
// fake_entities.add_rows_data()->CopyFrom(fake_row_data);
// }
//
// int64_t fake_row_num = 10;
//
// float fake_scores[10] = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 0.0};
// float fake_distance[10] = {9.7, 9.6, 9.5, 9.8, 8.7, 8.8, 9.9, 8.8, 9.7, 8.9};
//
// std::vector<milvus::grpc::KeyValuePair> fake_extra_params;
// milvus::grpc::KeyValuePair keyValuePair;
// for (int i = 0; i < 10; ++i) {
// keyValuePair.set_key(std::to_string(i));
// keyValuePair.set_value(std::to_string(i + 10));
// fake_extra_params.push_back(keyValuePair);
// }
//
// int64_t fake_query_id = 10;
// int64_t fake_client_id = 1;
//
// fake_message.mutable_status()->CopyFrom(fake_status);
// fake_message.mutable_entities()->CopyFrom(fake_entities);
// fake_message.set_row_num(fake_row_num);
// for (int i = 0; i < 10; i++) {
// fake_message.add_scores(fake_scores[i]);
// fake_message.add_distances(fake_distance[i]);
// fake_message.add_extra_params()->CopyFrom(fake_extra_params[i]);
// }
//
// fake_message.set_query_id(fake_query_id);
// fake_message.set_client_id(fake_client_id);
//
// float fake_scores2[10] = {2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 0.0, 1.1};
// float fake_distance2[10] = {9.8, 8.6, 9.6, 9.7, 8.9, 8.8, 9.0, 9.8, 9.7, 8.9};
//
// fake_message2.mutable_status()->CopyFrom(fake_status);
// fake_message2.mutable_entities()->CopyFrom(fake_entities);
// fake_message2.set_row_num(fake_row_num);
// for (int j = 0; j < 10; ++j) {
// fake_message2.add_scores(fake_scores2[j]);
// fake_message2.add_distances(fake_distance2[j]);
// fake_message2.add_extra_params()->CopyFrom(fake_extra_params[j]);
// }
//
// fake_message2.set_query_id(fake_query_id);
// fake_message2.set_client_id(fake_client_id);
//
// search_by_id_producer_->send(fake_message.SerializeAsString());
// search_by_id_producer_->send(fake_message2.SerializeAsString());
// return Status::OK();
}
MsgClientV2::~MsgClientV2() {

View File

@ -31,7 +31,7 @@ class MsgClientV2 {
//
Status SendQueryMessage(const milvus::grpc::SearchParam &request, uint64_t timestamp, int64_t &query_id);
Status GetQueryResult(int64_t query_id, milvus::grpc::QueryResult* result);
Status GetQueryResult(int64_t query_id, milvus::grpc::QueryResult *result);
private:
int64_t GetUniqueQId() {

View File

@ -1,12 +0,0 @@
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/third_party/protobuf/src)
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/include)
add_subdirectory( etcd_watcher )
aux_source_directory( ./master master_src)
add_library(meta ${master_src}
./etcd_watcher/Watcher.cpp
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/master.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/master.grpc.pb.cc
)

View File

@ -1,14 +0,0 @@
AUX_SOURCE_DIRECTORY(. watcher_src)
add_executable(test_watcher
${watcher_src}
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
)
target_link_libraries(
test_watcher
PRIVATE
libprotobuf
grpc++_reflection
grpc++
)

View File

@ -1,90 +0,0 @@
#include "Watcher.h"
#include <memory>
#include <utility>
#include "grpc/etcd.grpc.pb.h"
namespace milvus {
namespace master {
Watcher::Watcher(const std::string &address,
const std::string &key,
std::function<void(etcdserverpb::WatchResponse)> callback,
bool with_prefix) {
auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
stub_ = etcdserverpb::Watch::NewStub(channel);
call_ = std::make_unique<AsyncWatchAction>(key, with_prefix, stub_.get());
work_thread_ = std::thread([&]() {
call_->WaitForResponse(callback);
});
}
void Watcher::Cancel() {
call_->CancelWatch();
}
Watcher::~Watcher() {
Cancel();
work_thread_.join();
}
AsyncWatchAction::AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub *stub) {
// tag `1` means to wire a rpc
stream_ = stub->AsyncWatch(&context_, &cq_, (void *) 1);
etcdserverpb::WatchRequest req;
req.mutable_create_request()->set_key(key);
if (with_prefix) {
std::string range_end(key);
int ascii = (int) range_end[range_end.length() - 1];
range_end.back() = ascii + 1;
req.mutable_create_request()->set_range_end(range_end);
}
void *got_tag;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 1) {
// tag `2` means write watch request to stream
stream_->Write(req, (void *) 2);
} else {
throw std::runtime_error("failed to create a watch connection");
}
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 2) {
stream_->Read(&reply_, (void *) this);
} else {
throw std::runtime_error("failed to write WatchCreateRequest to server");
}
}
void AsyncWatchAction::WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback) {
void *got_tag;
bool ok = false;
while (cq_.Next(&got_tag, &ok)) {
if (!ok) {
break;
}
if (got_tag == (void *) 3) {
cancled_.store(true);
cq_.Shutdown();
break;
} else if (got_tag == (void *) this) // read tag
{
if (reply_.events_size()) {
callback(reply_);
}
stream_->Read(&reply_, (void *) this);
}
}
}
void AsyncWatchAction::CancelWatch() {
if (!cancled_.load()) {
// tag `3` mean write done
stream_->WritesDone((void *) 3);
cancled_.store(true);
}
}
}
}

View File

@ -1,40 +0,0 @@
#pragma once
#include "grpc/etcd.grpc.pb.h"
#include <grpc++/grpc++.h>
#include <thread>
namespace milvus {
namespace master {
class AsyncWatchAction;
class Watcher {
public:
Watcher(std::string const &address,
std::string const &key,
std::function<void(etcdserverpb::WatchResponse)> callback,
bool with_prefix = true);
void Cancel();
~Watcher();
private:
std::unique_ptr<etcdserverpb::Watch::Stub> stub_;
std::unique_ptr<AsyncWatchAction> call_;
std::thread work_thread_;
};
class AsyncWatchAction {
public:
AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub* stub);
void WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback);
void CancelWatch();
private:
// Status status;
grpc::ClientContext context_;
grpc::CompletionQueue cq_;
etcdserverpb::WatchResponse reply_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>> stream_;
std::atomic<bool> cancled_ = false;
};
}
}

View File

@ -1,31 +0,0 @@
// Steps to test this file:
// 1. start a etcdv3 server
// 2. run this test
// 3. modify test key using etcdctlv3 or etcd-clientv3(Must using v3 api)
// TODO: move this test to unittest
#include "Watcher.h"
using namespace milvus::master;
int main() {
try {
Watcher watcher("127.0.0.1:2379", "SomeKey", [](etcdserverpb::WatchResponse res) {
std::cerr << "Key1 changed!" << std::endl;
std::cout << "Event size: " << res.events_size() << std::endl;
for (auto &event: res.events()) {
std::cout <<
event.kv().key() << ":" <<
event.kv().value() << std::endl;
}
}, false);
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(60000));
watcher.Cancel();
break;
}
}
catch (const std::exception &e) {
std::cout << e.what();
}
}

View File

@ -1,35 +0,0 @@
#include "GrpcClient.h"
#include "grpc++/grpc++.h"
using grpc::ClientContext;
namespace milvus {
namespace master {
GrpcClient::GrpcClient(const std::string &addr) {
auto channel = ::grpc::CreateChannel(addr, ::grpc::InsecureChannelCredentials());
stub_ = masterpb::Master::NewStub(channel);
}
GrpcClient::GrpcClient(std::shared_ptr<::grpc::Channel> &channel)
: stub_(masterpb::Master::NewStub(channel)) {
}
Status GrpcClient::CreateCollection(const milvus::grpc::Mapping &mapping) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->CreateCollection(&context, mapping, &response);
if (!grpc_status.ok()) {
std::cerr << "CreateHybridCollection gRPC failed!" << std::endl;
return Status(grpc_status.error_code(), grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
// TODO: LOG
return Status(response.error_code(), response.reason());
}
return Status::OK();
}
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include "grpc/master.grpc.pb.h"
#include "grpc/message.pb.h"
#include "grpc++/grpc++.h"
#include "utils/Status.h"
namespace milvus {
namespace master {
class GrpcClient {
public:
explicit GrpcClient(const std::string& addr);
explicit GrpcClient(std::shared_ptr<::grpc::Channel>& channel);
~GrpcClient() = default;
public:
Status
CreateCollection(const milvus::grpc::Mapping& mapping);
private:
std::unique_ptr<masterpb::Master::Stub> stub_;
};
}
}

View File

@ -43,7 +43,7 @@ set( GRPC_SERVER_FILES ${GRPC_IMPL_FILES}
aux_source_directory( ${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES )
add_library( server STATIC)
add_library( server STATIC MessageWrapper.cpp MessageWrapper.h)
target_sources( server
PRIVATE ${GRPC_SERVER_FILES}
${GRPC_SERVICE_FILES}

View File

@ -1,21 +0,0 @@
#include "MetaWrapper.h"
#include "config/ServerConfig.h"
namespace milvus{
namespace server {
MetaWrapper& MetaWrapper::GetInstance() {
static MetaWrapper wrapper;
return wrapper;
}
Status MetaWrapper::Init() {
auto addr = config.master.address() + ":" + std::to_string(config.master.port());
client_ = std::make_shared<milvus::master::GrpcClient>(addr);
}
std::shared_ptr<milvus::master::GrpcClient> MetaWrapper::MetaClient() {
return client_;
}
}
}

View File

@ -1,24 +0,0 @@
#include "utils/Status.h"
#include "meta/master/GrpcClient.h"
namespace milvus{
namespace server{
class MetaWrapper {
public:
static MetaWrapper&
GetInstance();
Status
Init();
std::shared_ptr<milvus::master::GrpcClient>
MetaClient();
private:
std::shared_ptr<milvus::master::GrpcClient> client_;
};
}
}

View File

@ -34,7 +34,6 @@
#include "utils/SignalHandler.h"
#include "utils/TimeRecorder.h"
#include "MessageWrapper.h"
#include "MetaWrapper.h"
namespace milvus {
namespace server {
@ -241,15 +240,12 @@ Server::StartService() {
grpc::GrpcServer::GetInstance().Start();
// Init pulsar message client
stat = MessageWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message();
goto FAIL;
}
MetaWrapper::GetInstance().Init();
return Status::OK();
FAIL:
std::cerr << "Milvus initializes fail: " << stat.message() << std::endl;

View File

@ -46,7 +46,6 @@ SearchReq::Create(const ContextPtr& context, const ::milvus::grpc::SearchParam *
Status
SearchReq::OnExecute() {
auto message_wrapper = milvus::server::MessageWrapper::GetInstance();
message_wrapper.Init();
auto client = message_wrapper.MessageClient();
int64_t query_id;

View File

@ -26,7 +26,6 @@
#include "tracing/TextMapCarrier.h"
#include "tracing/TracerUtil.h"
#include "utils/Log.h"
#include "server/MetaWrapper.h"
namespace milvus {
namespace server {
@ -341,10 +340,6 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext *context, const ::mil
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status = MetaWrapper::GetInstance().MetaClient()->CreateCollection(*request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context)
return ::grpc::Status::OK;
}
@ -857,8 +852,10 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, const ::milvus::grpc:
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
//TODO: check if the request is legal
BaseReqPtr req_ptr = SearchReq::Create(GetContext(context), request, response);
ReqScheduler::ExecReq(req_ptr);
BaseReqPtr req_ptr = SearchReq::Create(GetContext(context), request, response);
ReqScheduler::ExecReq(req_ptr);
return ::grpc::Status::OK;
}

View File

@ -66,10 +66,6 @@ add_custom_command(TARGET generate_suvlim_pb_grpc
COMMAND echo "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}"
COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"
"${PROTO_PATH}/proto/etcd.proto"
DEPENDS "${PROTO_PATH}/proto/etcd.proto"
)
set_property( GLOBAL PROPERTY PROTOC_EXCUTABLE ${PROTOC_EXCUTABLE})

View File

@ -87,12 +87,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
}
}
@ -145,9 +145,9 @@ func (node *QueryNode) PrepareBatchMsg() []int {
return msgLen
}
func (node *QueryNode) StartMessageClient(pulsarURL string) {
func (node *QueryNode) StartMessageClient() {
// TODO: add consumerMsgSchema
node.messageClient.InitClient(pulsarURL)
node.messageClient.InitClient("pulsar://192.168.2.28:6650")
go node.messageClient.ReceiveMessage()
}
@ -181,7 +181,7 @@ func (node *QueryNode) RunInsertDelete() {
if count == 0 {
start = time.Now()
}
count += msgLen[0]
count+=msgLen[0]
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
@ -191,7 +191,7 @@ func (node *QueryNode) RunInsertDelete() {
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
//fmt.Print("UpdateSearchTimeSync Done\n\n\n")
if count == 100000-1 {
if count == 100000 - 1 {
elapsed := time.Since(start)
fmt.Println("Query node insert 10 × 10000 time:", elapsed)
}
@ -200,23 +200,19 @@ func (node *QueryNode) RunInsertDelete() {
func (node *QueryNode) RunSearch() {
for {
time.Sleep(0.2 * 1000 * time.Millisecond)
//time.Sleep(2 * 1000 * time.Millisecond)
start := time.Now()
if len(node.messageClient.GetSearchChan()) <= 0 {
fmt.Println("null Search")
//fmt.Println("null Search")
continue
}
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
msg := <-node.messageClient.GetSearchChan()
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
fmt.Println("Do Search...")
var status = node.Search(node.messageClient.SearchMsg)
if status.ErrorCode != 0 {
fmt.Println("Search Failed")
node.PublishFailedSearchResult()
}
node.Search(node.messageClient.SearchMsg)
elapsed := time.Since(start)
fmt.Println("Query node search time:", elapsed)
@ -431,8 +427,6 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
var timestamp = msg.Timestamp
var vector = msg.Records
// We now only the first Json is valid.
var queryJson = msg.Json[0]
// 1. Timestamp check
// TODO: return or wait? Or adding graceful time
@ -443,7 +437,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
// 2. Do search in all segments
for _, partition := range targetCollection.Partitions {
for _, openSegment := range partition.OpenedSegments {
var res, err = openSegment.SegmentSearch(queryJson, timestamp, vector)
var res, err = openSegment.SegmentSearch("", timestamp, vector)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -454,7 +448,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
}
}
for _, closedSegment := range partition.ClosedSegments {
var res, err = closedSegment.SegmentSearch(queryJson, timestamp, vector)
var res, err = closedSegment.SegmentSearch("", timestamp, vector)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -474,9 +468,6 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
Ids: make([]int64, 0),
}
var results = msgPb.QueryResult{
Status: &msgPb.Status{
ErrorCode: 0,
},
Entities: &entities,
Distances: make([]float32, 0),
QueryId: msg.Uid,

View File

@ -1,10 +1,10 @@
package reader
func startQueryNode(pulsarURL string) {
func startQueryNode() {
qn := NewQueryNode(0, 0)
qn.InitQueryNodeCollection()
//go qn.SegmentService()
qn.StartMessageClient(pulsarURL)
qn.StartMessageClient()
go qn.RunSearch()
qn.RunInsertDelete()

View File

@ -5,6 +5,5 @@ import (
)
func TestReader_startQueryNode(t *testing.T) {
pulsarURL := "pulsar://192.168.2.28:6650"
startQueryNode(pulsarURL)
startQueryNode()
}

View File

@ -10,8 +10,8 @@ import (
type ResultEntityIds []int64
type SearchResult struct {
ResultIds []int64
ResultDistances []float32
ResultIds []int64
ResultDistances []float32
}
func getResultTopicByClientId(clientId int64) string {
@ -28,20 +28,6 @@ func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult, clientId
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) PublishFailedSearchResult() msgPb.Status {
var results = msgPb.QueryResult{
Status: &msgPb.Status{
ErrorCode: 1,
Reason: "Search Failed",
},
}
var ctx = context.Background()
node.messageClient.Send(ctx, results)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) PublicStatistic(statisticTopic string) msgPb.Status {
// TODO: get statistic info
// getStatisticInfo()

View File

@ -164,14 +164,12 @@ func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]
return nil
}
func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
/*C.Search
int
Search(CSegmentBase c_segment,
const char* query_json,
void* fake_query,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);
*/
@ -181,23 +179,12 @@ func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord
resultIds := make([]int64, TopK)
resultDistances := make([]float32, TopK)
var cQueryJson = C.CString(queryJson)
var cQueryPtr = unsafe.Pointer(nil)
var cTimestamp = C.ulong(timestamp)
var cResultIds = (*C.long)(&resultIds[0])
var cResultDistances = (*C.float)(&resultDistances[0])
var cQueryRawData *C.float
var cQueryRawDataLength C.int
if vectorRecord.BinaryData != nil {
return nil, errors.New("Data of binary type is not supported yet")
} else if len(vectorRecord.FloatData) <= 0 {
return nil, errors.New("Null query vector data")
} else {
cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))
}
var status = C.Search(s.SegmentPtr, cQueryJson, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances)
var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances)
if status != 0 {
return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))

View File

@ -3,7 +3,6 @@ package reader
import (
"encoding/binary"
"fmt"
schema "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"math"
"testing"
@ -132,15 +131,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
assert.NoError(t, err)
// 6. Do search
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
var queryRawData = make([]float32, 0)
for i := 0; i < 16; i ++ {
queryRawData = append(queryRawData, float32(i))
}
var vectorRecord = schema.VectorRowRecord {
FloatData: queryRawData,
}
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[0], &vectorRecord)
var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil)
assert.NoError(t, searchErr)
fmt.Println(searchRes)

View File

@ -49,7 +49,7 @@ GRPC_INCLUDE=.:.
rm -rf proto-cpp && mkdir -p proto-cpp
PB_FILES=()
GRPC_FILES=("message.proto" "master.proto")
GRPC_FILES=("message.proto")
ALL_FILES=("${PB_FILES[@]}")
ALL_FILES+=("${GRPC_FILES[@]}")

View File

@ -14,6 +14,7 @@
#include "include/MilvusApi.h"
#include "grpc/ClientProxy.h"
#include "interface/ConnectionImpl.h"
#include "utils/TimeRecorder.h"
int main(int argc , char**argv) {
auto client = milvus::ConnectionImpl();
@ -50,25 +51,9 @@ int main(int argc , char**argv) {
milvus::TopKQueryResult result;
auto t1 = std::chrono::high_resolution_clock::now();
// for (int k = 0; k < 1000; ++k) {
milvus_sdk::TimeRecorder test_search("search");
auto status = client.Search("collection1", partition_list, "dsl", vectorParam, result);
// }
// std::cout << "hahaha" << std::endl;
// if (result.size() > 0){
// std::cout << result[0].ids[0] << std::endl;
// std::cout << result[0].distances[0] << std::endl;
// } else {
// std::cout << "sheep is a shadiao";
// }
auto t2 = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
std::cout << "Query run time: " << duration/1000.0 << "ms" << std::endl;
return 0;
}