Fix gtest, support AckResponder, update segment interface

Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
FluorineDog 2020-09-09 12:02:52 +08:00 committed by yefu.chen
parent f1b08a98c5
commit 3f10cddf3e
19 changed files with 965 additions and 551 deletions

View File

@ -1,45 +1,58 @@
########################### GTEST
# Enable ExternalProject CMake module
INCLUDE(ExternalProject)
find_package(Threads REQUIRED)
# Set default ExternalProject root directory
SET_DIRECTORY_PROPERTIES(PROPERTIES EP_PREFIX ${CMAKE_BINARY_DIR}/third_party)
# Add gtest
# http://stackoverflow.com/questions/9689183/cmake-googletest
include(ExternalProject)
ExternalProject_Add(
googletest
URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip
# TIMEOUT 10
# # Force separate output paths for debug and release builds to allow easy
# # identification of correct lib in subsequent TARGET_LINK_LIBRARIES commands
# CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs
# -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_RELEASE:PATH=ReleaseLibs
# -Dgtest_force_shared_crt=ON
# Disable install step
INSTALL_COMMAND ""
# Wrap download, configure and build steps in a script to log output
LOG_DOWNLOAD ON
LOG_CONFIGURE ON
LOG_BUILD ON)
googletest
URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip
UPDATE_COMMAND ""
INSTALL_COMMAND ""
LOG_DOWNLOAD ON
LOG_CONFIGURE ON
LOG_BUILD ON)
# Specify include dir
ExternalProject_Get_Property(googletest source_dir)
set(GTEST_INCLUDE_DIR ${source_dir}/include)
set(GTEST_INCLUDE_DIRS ${source_dir}/googletest/include)
set(GMOCK_INCLUDE_DIRS ${source_dir}/googlemock/include)
# The cloning of the above repo doesn't happen until make, however if the dir doesn't
# exist, INTERFACE_INCLUDE_DIRECTORIES will throw an error.
# To make it work, we just create the directory now during config.
file(MAKE_DIRECTORY ${GTEST_INCLUDE_DIRS})
file(MAKE_DIRECTORY ${GMOCK_INCLUDE_DIRS})
# Library
ExternalProject_Get_Property(googletest binary_dir)
set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a)
set(GTEST_LIBRARY gtest)
add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED)
set_target_properties(${GTEST_LIBRARY} PROPERTIES
"IMPORTED_LOCATION" "${GTEST_LIBRARY_PATH}"
"IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
"INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}")
add_dependencies(${GTEST_LIBRARY} googletest)
# set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a)
# set(GTEST_LIBRARY gtest)
# add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED)
# set_property(TARGET ${GTEST_LIBRARY} PROPERTY IMPORTED_LOCATION
# ${GTEST_LIBRARY_PATH} )
# add_dependencies(${GTEST_LIBRARY} googletest)
set(GTEST_LIBRARY_PATH ${binary_dir}/lib)
add_library(gtest UNKNOWN IMPORTED)
add_library(gtest_main UNKNOWN IMPORTED)
set_property(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest.a)
set_property(TARGET gtest_main PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest_main.a)
add_dependencies(gtest googletest)
add_dependencies(gtest_main googletest)
set(GTEST_MAIN_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest_main.a)
set(GTEST_MAIN_LIBRARY gtest_main)
add_library(${GTEST_MAIN_LIBRARY} UNKNOWN IMPORTED)
set_target_properties(${GTEST_MAIN_LIBRARY} PROPERTIES
"IMPORTED_LOCATION" "${GTEST_MAIN_LIBRARY_PATH}"
"IMPORTED_LINK_INTERFACE_LIBRARImS" "${CMAKE_THREAD_LIBS_INIT}"
"INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}")
add_dependencies(${GTEST_MAIN_LIBRARY} googletest)
# set(GMOCK_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock.a)
# set(GMOCK_LIBRARY gmock)
# add_library(${GMOCK_LIBRARY} UNKNOWN IMPORTED)
# set_target_properties(${GMOCK_LIBRARY} PROPERTIES
# "IMPORTED_LOCATION" "${GMOCK_LIBRARY_PATH}"
# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}")
# add_dependencies(${GMOCK_LIBRARY} googletest)
# set(GMOCK_MAIN_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock_main.a)
# set(GMOCK_MAIN_LIBRARY gmock_main)
# add_library(${GMOCK_MAIN_LIBRARY} UNKNOWN IMPORTED)
# set_target_properties(${GMOCK_MAIN_LIBRARY} PROPERTIES
# "IMPORTED_LOCATION" "${GMOCK_MAIN_LIBRARY_PATH}"
# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}")
# add_dependencies(${GMOCK_MAIN_LIBRARY} ${GTEST_LIBRARY})

View File

@ -0,0 +1,41 @@
#pragma once
#include <shared_mutex>
#include <mutex>
#include <set>
#include <atomic>
namespace milvus::dog_segment {
class AckResponder {
public:
void
AddSegment(int64_t seg_begin, int64_t seg_end) {
std::lock_guard lck(mutex_);
fetch_and_flip(seg_end);
auto old_begin = fetch_and_flip(seg_begin);
if(old_begin) {
minimal = *acks_.begin();
}
}
int64_t
GetAck() const{
return minimal;
}
private:
bool
fetch_and_flip(int64_t endpoint) {
if (acks_.count(endpoint)) {
acks_.erase(endpoint);
return true;
} else {
acks_.insert(endpoint);
return false;
}
}
private:
std::shared_mutex mutex_;
std::set<int64_t> acks_ = {0};
std::atomic<int64_t> minimal = 0;
};
}

View File

@ -1,16 +1,17 @@
set(DOG_SEGMENT_FILES
SegmentNaive.cpp
Collection.cpp
Partition.cpp
collection_c.cpp
partition_c.cpp
segment_c.cpp
IndexMeta.cpp
ConcurrentVector.cpp
# Collection.cpp
# Partition.cpp
# collection_c.cpp
# partition_c.cpp
# segment_c.cpp
)
# Third Party dablooms file
#aux_source_directory( ${MILVUS_THIRDPARTY_SRC}/dablooms THIRDPARTY_DABLOOMS_FILES )
add_library(milvus_dog_segment SHARED
${DOG_SEGMENT_FILES}
)
#add_dependencies( segment sqlite mysqlpp )
target_link_libraries(milvus_dog_segment tbb milvus_utils pthread)

View File

@ -1,37 +1,51 @@
#pragma once
#include "dog_segment/Partition.h"
#include "SegmentDefs.h"
namespace milvus::dog_segment {
//////////////////////////////////////////////////////////////////
class Collection {
class Partition {
public:
explicit Collection(std::string &collection_name, std::string &schema);
// TODO: set index
void set_index();
// TODO: config to schema
void parse();
public:
SchemaPtr& get_schema() {
return schema_;
}
std::string& get_collection_name() {
return collection_name_;
const std::deque<SegmentBasePtr>& segments() const {
return segments_;
}
private:
// TODO: add Index ptr
// IndexPtr index_ = nullptr;
std::string collection_name_;
std::string schema_json_;
SchemaPtr schema_;
std::string name_;
std::deque<SegmentBasePtr> segments_;
};
using CollectionPtr = std::unique_ptr<Collection>;
using PartitionPtr = std::shard_ptr<Partition>;
}
//////////////////////////////////////////////////////////////////
class Collection {
public:
explicit Collection(std::string name): name_(name){}
// TODO: set index
set_index() {}
set_schema(std::string config) {
// TODO: config to schema
schema_ = null;
}
public:
// std::vector<int64_t> Insert() {
// for (auto partition: partitions_) {
// for (auto segment: partition.segments()) {
// if (segment.Status == Status.open) {
// segment.Insert()
// }
// }
// }
// }
private:
// TODO: Index ptr
IndexPtr index_ = nullptr;
std::string name_;
SchemaPtr schema_;
std::vector<PartitionPtr> partitions_;
};

View File

@ -0,0 +1,8 @@
#include <iostream>
#include "dog_segment/ConcurrentVector.h"
namespace milvus::dog_segment {
}

View File

@ -0,0 +1,204 @@
#pragma once
#include <tbb/concurrent_vector.h>
#include <atomic>
#include <cassert>
#include <deque>
#include <mutex>
#include <shared_mutex>
#include <vector>
namespace milvus::dog_segment {
// we don't use std::array because capacity of concurrent_vector wastes too much memory
// template <typename Type>
// class FixedVector : public std::vector<Type> {
// public:
// // This is a stupid workaround for tbb API to avoid memory copy
// explicit FixedVector(int64_t size) : placeholder_size_(size) {
// }
// FixedVector(const FixedVector<Type>& placeholder_vec)
// : std::vector<Type>(placeholder_vec.placeholder_size_), is_placeholder_(false) {
// // assert(placeholder_vec.is_placeholder_);
// }
// FixedVector(FixedVector<Type>&&) = delete;
//
// FixedVector&
// operator=(FixedVector<Type>&&) = delete;
//
// FixedVector&
// operator=(const FixedVector<Type>&) = delete;
//
// bool is_placeholder() {
// return is_placeholder_;
// }
// private:
// bool is_placeholder_ = true;
// int placeholder_size_ = 0;
//};
template <typename Type>
using FixedVector = std::vector<Type>;
template <typename Type>
class ThreadSafeVector {
public:
template <typename... Args>
void
emplace_to_at_least(int64_t size, Args... args) {
if (size <= size_) {
return;
}
// TODO: use multithread to speedup
std::lock_guard lck(mutex_);
while (vec_.size() < size) {
vec_.emplace_back(std::forward<Args...>(args...));
++size_;
}
}
const Type&
operator[](int64_t index) const {
assert(index < size_);
std::shared_lock lck(mutex_);
return vec_[index];
}
Type&
operator[](int64_t index) {
assert(index < size_);
std::shared_lock lck(mutex_);
return vec_[index];
}
int64_t
size() const {
return size_;
}
private:
std::atomic<int64_t> size_ = 0;
std::deque<Type> vec_;
mutable std::shared_mutex mutex_;
};
class VectorBase {
public:
VectorBase() = default;
virtual ~VectorBase() = default;
virtual void
grow_to_at_least(int64_t element_count) = 0;
virtual void set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) = 0;
};
template <typename Type, bool is_scalar = false, ssize_t ElementsPerChunk = 32 * 1024>
class ConcurrentVector : public VectorBase {
public:
// constants
using Chunk = FixedVector<Type>;
ConcurrentVector(ConcurrentVector&&) = delete;
ConcurrentVector(const ConcurrentVector&) = delete;
ConcurrentVector& operator=(ConcurrentVector&&) = delete;
ConcurrentVector& operator=(const ConcurrentVector&) = delete;
public:
explicit ConcurrentVector(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) {
assert(is_scalar ? dim == 1 : dim != 1);
}
void
grow_to_at_least(int64_t element_count) override {
auto chunk_count = (element_count + ElementsPerChunk - 1) / ElementsPerChunk;
chunks_.emplace_to_at_least(chunk_count, SizePerChunk);
}
void
set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) override {
set_data(element_count, static_cast<const Type*>(source), element_count);
}
void
set_data(ssize_t element_offset, const Type* source, ssize_t element_count) {
if (element_count == 0) {
return;
}
this->grow_to_at_least(element_offset + element_count);
auto chunk_id = element_offset / ElementsPerChunk;
auto chunk_offset = element_offset % ElementsPerChunk;
ssize_t source_offset = 0;
// first partition:
if (chunk_offset + element_count <= ElementsPerChunk) {
// only first
fill_chunk(chunk_id, chunk_offset, element_count, source, source_offset);
return;
}
auto first_size = ElementsPerChunk - chunk_offset;
fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset);
source_offset += ElementsPerChunk - chunk_offset;
element_count -= first_size;
++chunk_id;
// the middle
while (element_count >= ElementsPerChunk) {
fill_chunk(chunk_id, 0, ElementsPerChunk, source, source_offset);
source_offset += ElementsPerChunk;
element_count -= ElementsPerChunk;
++chunk_id;
}
// the final
if (element_count > 0) {
fill_chunk(chunk_id, 0, element_count, source, source_offset);
}
}
const Chunk&
get_chunk(ssize_t chunk_index) const {
return chunks_[chunk_index];
}
// just for fun, don't use it directly
const Type*
get_element(ssize_t element_index) const {
auto chunk_id = element_index / ElementsPerChunk;
auto chunk_offset = element_index % ElementsPerChunk;
return get_chunk(chunk_id).data() + chunk_offset * Dim;
}
const Type&
operator[](ssize_t element_index) const {
assert(Dim == 1);
auto chunk_id = element_index / ElementsPerChunk;
auto chunk_offset = element_index % ElementsPerChunk;
return get_chunk(chunk_id)[chunk_offset];
}
ssize_t
chunk_size() const {
return chunks_.size();
}
private:
void
fill_chunk(ssize_t chunk_id, ssize_t chunk_offset, ssize_t element_count, const Type* source,
ssize_t source_offset) {
if (element_count <= 0) {
return;
}
auto chunk_max_size = chunks_.size();
assert(chunk_id < chunk_max_size);
Chunk& chunk = chunks_[chunk_id];
auto ptr = chunk.data();
std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim);
}
const ssize_t Dim;
const ssize_t SizePerChunk;
private:
ThreadSafeVector<Chunk> chunks_;
};
} // namespace milvus::dog_segment

View File

@ -0,0 +1,56 @@
// #include "IndexMeta.h"
// #include <mutex>
// #include <cassert>
// namespace milvus::dog_segment {
//
// Status
// IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name, IndexType type, IndexMode mode,
// IndexConfig config) {
// Entry entry{
// index_name,
// field_name,
// type,
// mode,
// std::move(config)
// };
// VerifyEntry(entry);
//
// if (entries_.count(index_name)) {
// throw std::invalid_argument("duplicate index_name");
// }
// // TODO: support multiple indexes for single field
// assert(!lookups_.count(field_name));
// lookups_[field_name] = index_name;
// entries_[index_name] = std::move(entry);
//
// return Status::OK();
// }
//
// Status
// IndexMeta::DropEntry(const std::string& index_name) {
// assert(entries_.count(index_name));
// auto entry = std::move(entries_[index_name]);
// if(lookups_[entry.field_name] == index_name) {
// lookups_.erase(entry.field_name);
// }
// return Status::OK();
// }
//
// void IndexMeta::VerifyEntry(const Entry &entry) {
// auto is_mode_valid = std::set{IndexMode::MODE_CPU, IndexMode::MODE_GPU}.count(entry.mode);
// if(!is_mode_valid) {
// throw std::invalid_argument("invalid mode");
// }
//
// auto& schema = *schema_;
// auto& field_meta = schema[entry.index_name];
// // TODO checking
// if(field_meta.is_vector()) {
// assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
// } else {
// assert(false);
// }
// }
//
// } // namespace milvus::dog_segment
//

View File

@ -0,0 +1,57 @@
#pragma once
//
//#include <shared_mutex>
//
//#include "SegmentDefs.h"
//#include "knowhere/index/IndexType.h"
//
#include <memory>
class IndexMeta;
namespace milvus::dog_segment {
//// TODO: this is
//class IndexMeta {
// public:
// IndexMeta(SchemaPtr schema) : schema_(schema) {
// }
// using IndexType = knowhere::IndexType;
// using IndexMode = knowhere::IndexMode;
// using IndexConfig = knowhere::Config;
//
// struct Entry {
// std::string index_name;
// std::string field_name;
// IndexType type;
// IndexMode mode;
// IndexConfig config;
// };
//
// Status
// AddEntry(const std::string& index_name, const std::string& field_name, IndexType type, IndexMode mode,
// IndexConfig config);
//
// Status
// DropEntry(const std::string& index_name);
//
// const std::map<std::string, Entry>&
// get_entries() {
// return entries_;
// }
//
// const Entry& lookup_by_field(const std::string& field_name) {
// auto index_name = lookups_.at(field_name);
// return entries_.at(index_name);
// }
// private:
// void
// VerifyEntry(const Entry& entry);
//
// private:
// SchemaPtr schema_;
// std::map<std::string, Entry> entries_; // index_name => Entry
// std::map<std::string, std::string> lookups_; // field_name => index_name
//};
//
using IndexMetaPtr = std::shared_ptr<IndexMeta>;
//
} // namespace milvus::dog_segment
//

View File

@ -1,19 +1,20 @@
#pragma once
#include <vector>
// #include "db/Types.h"
#include "IndexMeta.h"
#include "utils/Types.h"
#include "dog_segment/SegmentDefs.h"
// #include "knowhere/index/Index.h"
// #include "knowhere/index/IndexType.h"
#include "query/GeneralQuery.h"
using idx_t = int64_t;
namespace milvus {
namespace dog_segment {
using engine::DataChunk;
using engine::DataChunkPtr;
// using engine::DataChunk;
// using engine::DataChunkPtr;
using engine::QueryResult;
using DogDataChunkPtr = std::shared_ptr<DataChunk>;
// using DogDataChunkPtr = std::shared_ptr<DataChunk>;
int
TestABI();
@ -87,5 +88,5 @@ using SegmentBasePtr = std::unique_ptr<SegmentBase>;
SegmentBasePtr
CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta);
} // namespace engine
} // namespace dog_segment
} // namespace milvus

View File

@ -1,16 +1,13 @@
#pragma once
#include <vector>
#include <unordered_map>
// #include "db/Types.h"
#include "utils/Types.h"
// #include "knowhere/index/Index.h"
#include "utils/Status.h"
#include "utils/Types.h"
#include <cassert>
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
namespace milvus::dog_segment {
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
using engine::DataType;
using engine::FieldElementType;
@ -20,11 +17,6 @@ struct DogDataChunk {
int64_t count;
};
struct IndexConfig {
// TODO
// std::unordered_map<std::string, knowhere::Config> configs;
};
inline int
field_sizeof(DataType data_type, int dim = 1) {
switch (data_type) {
@ -49,12 +41,17 @@ field_sizeof(DataType data_type, int dim = 1) {
return dim / 8;
}
default: {
assert(false);
throw std::invalid_argument("unsupported data type");
return 0;
}
}
}
inline bool
field_is_vector(DataType datatype) {
return datatype == DataType::VECTOR_BINARY || datatype == DataType::VECTOR_FLOAT;
}
struct FieldMeta {
public:
FieldMeta(std::string_view name, DataType type, int dim = 1) : name_(name), type_(type), dim_(dim) {
@ -107,10 +104,12 @@ class Schema {
void
AddField(FieldMeta field_meta) {
auto index = fields_.size();
auto offset = fields_.size();
fields_.emplace_back(field_meta);
indexes_.emplace(field_meta.get_name(), index);
total_sizeof_ = field_meta.get_sizeof();
offsets_.emplace(field_meta.get_name(), offset);
auto field_sizeof = field_meta.get_sizeof();
sizeof_infos_.push_back(field_sizeof);
total_sizeof_ += field_sizeof;
}
auto
@ -132,7 +131,8 @@ class Schema {
return fields_.end();
}
int size() const {
int
size() const {
return fields_.size();
}
@ -141,12 +141,22 @@ class Schema {
return fields_[field_index];
}
auto
get_total_sizeof() const {
return total_sizeof_;
}
const std::vector<int>& get_sizeof_infos() {
return sizeof_infos_;
}
const FieldMeta&
operator[](const std::string& field_name) const {
auto index_iter = indexes_.find(field_name);
assert(index_iter != indexes_.end());
auto index = index_iter->second;
return (*this)[index];
auto offset_iter = offsets_.find(field_name);
assert(offset_iter != offsets_.end());
auto offset = offset_iter->second;
return (*this)[offset];
}
private:
@ -155,19 +165,11 @@ class Schema {
private:
// a mapping for random access
std::unordered_map<std::string, int> indexes_;
int total_sizeof_;
std::unordered_map<std::string, int> offsets_;
std::vector<int> sizeof_infos_;
int total_sizeof_ = 0;
};
using SchemaPtr = std::shared_ptr<Schema>;
class IndexData {
public:
virtual std::vector<char>
serilize() = 0;
static std::shared_ptr<IndexData>
deserialize(int64_t size, const char* blob);
};
} // namespace milvus::dog_segment

View File

@ -1,246 +1,255 @@
#include <shared_mutex>
#include <dog_segment/SegmentNaive.h>
#include "dog_segment/SegmentBase.h"
#include "utils/Status.h"
#include <tbb/concurrent_vector.h>
#include <tbb/concurrent_unordered_map.h>
#include <atomic>
#include <algorithm>
#include <numeric>
#include <thread>
namespace milvus::dog_segment {
int
TestABI() {
return 42;
}
struct ColumnBasedDataChunk {
std::vector<std::vector<float>> entity_vecs;
static ColumnBasedDataChunk from(const DogDataChunk& source, const Schema& schema){
ColumnBasedDataChunk dest;
auto count = source.count;
auto raw_data = reinterpret_cast<const char*>(source.raw_data);
auto align = source.sizeof_per_row;
for(auto& field: schema) {
auto len = field.get_sizeof();
assert(len % sizeof(float) == 0);
std::vector<float> new_col(len * count / sizeof(float));
for(int64_t i = 0; i < count; ++i) {
memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align , len);
}
dest.entity_vecs.push_back(std::move(new_col));
// offset the raw_data
raw_data += len / sizeof(float);
}
return dest;
}
};
class SegmentNaive : public SegmentBase {
public:
virtual ~SegmentNaive() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
// TODO: originally, id should be put into data_chunk
// TODO: Is it ok to put them the other side?
Status
Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps,
const DogDataChunk& values) override;
// TODO: add id into delete log, possibly bitmap
Status
Delete(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) override;
// query contains metadata of
Status
Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) override;
// // THIS FUNCTION IS REMOVED
// virtual Status
// GetEntityByIds(Timestamp timestamp, const std::vector<Id>& ids, DataChunkPtr& results) = 0;
// stop receive insert requests
Status
Close() override {
std::lock_guard<std::shared_mutex> lck(mutex_);
assert(state_ == SegmentState::Open);
state_ = SegmentState::Closed;
return Status::OK();
}
// // to make all data inserted visible
// // maybe a no-op?
// virtual Status
// Flush(Timestamp timestamp) = 0;
// BuildIndex With Paramaters, must with Frozen State
// This function is atomic
// NOTE: index_params contains serveral policies for several index
Status
BuildIndex(std::shared_ptr<IndexConfig> index_params) override {
throw std::runtime_error("not implemented");
}
// Remove Index
Status
DropIndex(std::string_view field_name) override {
throw std::runtime_error("not implemented");
}
Status
DropRawData(std::string_view field_name) override {
// TODO: NO-OP
return Status::OK();
}
Status
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override {
// TODO: NO-OP
return Status::OK();
}
public:
ssize_t
get_row_count() const override {
return ack_count_.load(std::memory_order_relaxed);
}
// const FieldsInfo&
// get_fields_info() const override {
//
// }
//
// // check is_indexed here
// virtual const IndexConfig&
// get_index_param() const = 0;
//
SegmentState
get_state() const override {
return state_.load(std::memory_order_relaxed);
}
//
// std::shared_ptr<IndexData>
// get_index_data();
ssize_t
get_deleted_count() const override {
return 0;
}
public:
friend SegmentBasePtr
CreateSegment(SchemaPtr& schema);
private:
SchemaPtr schema_;
std::shared_mutex mutex_;
std::atomic<SegmentState> state_ = SegmentState::Open;
std::atomic<int64_t> ack_count_ = 0;
tbb::concurrent_vector<uint64_t> uids_;
tbb::concurrent_vector<Timestamp> timestamps_;
std::vector<tbb::concurrent_vector<float>> entity_vecs_;
tbb::concurrent_unordered_map<uint64_t, int> internal_indexes_;
tbb::concurrent_unordered_multimap<int, Timestamp> delete_logs_;
};
SegmentBasePtr
CreateSegment(SchemaPtr& schema) {
// TODO: remove hard code
auto schema_tmp = std::make_shared<Schema>();
schema_tmp->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema_tmp->AddField("age", DataType::INT32);
auto segment = std::make_unique<SegmentNaive>();
segment->schema_ = schema_tmp;
segment->entity_vecs_.resize(schema_tmp->size());
std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr remote_index_meta) {
auto segment = std::make_unique<SegmentNaive>(schema, remote_index_meta);
return segment;
}
Status
SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps,
const DogDataChunk& row_values) {
const auto& schema = *schema_;
auto data_chunk = ColumnBasedDataChunk::from(row_values, schema);
// insert datas
// TODO: use shared_lock
std::lock_guard lck(mutex_);
assert(state_ == SegmentState::Open);
auto ack_id = ack_count_.load();
uids_.grow_by(primary_keys, primary_keys + size);
for(int64_t i = 0; i < size; ++i) {
auto key = primary_keys[i];
auto internal_index = i + ack_id;
internal_indexes_[key] = internal_index;
SegmentNaive::Record::Record(const Schema& schema) : uids_(1), timestamps_(1) {
for (auto& field : schema) {
if (field.is_vector()) {
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
entity_vec_.emplace_back(std::make_shared<ConcurrentVector<float>>(field.get_dim()));
} else {
assert(field.get_data_type() == DataType::INT32);
entity_vec_.emplace_back(std::make_shared<ConcurrentVector<int32_t, false>>());
}
}
timestamps_.grow_by(timestamps, timestamps + size);
for(int fid = 0; fid < schema.size(); ++fid) {
auto field = schema[fid];
auto total_len = field.get_sizeof() * size / sizeof(float);
auto source_vec = data_chunk.entity_vecs[fid];
entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len);
}
// finish insert
ack_count_ += size;
return Status::OK();
}
Status SegmentNaive::Delete(int64_t size, const uint64_t *primary_keys, const Timestamp *timestamps) {
for(int i = 0; i < size; ++i) {
auto key = primary_keys[i];
auto time = timestamps[i];
delete_logs_.insert(std::make_pair(key, time));
int64_t
SegmentNaive::PreInsert(int64_t size) {
auto reserved_begin = record_.reserved.fetch_add(size);
return reserved_begin;
}
int64_t
SegmentNaive::PreDelete(int64_t size) {
throw std::runtime_error("unimplemented");
}
Status
SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t* uids_raw, const Timestamp* timestamps_raw,
const DogDataChunk& entities_raw) {
assert(entities_raw.count == size);
assert(entities_raw.sizeof_per_row == schema_->get_total_sizeof());
auto raw_data = reinterpret_cast<const char*>(entities_raw.raw_data);
// std::vector<char> entities(raw_data, raw_data + size * len_per_row);
auto len_per_row = entities_raw.sizeof_per_row;
std::vector<std::tuple<Timestamp, idx_t, int64_t>> ordering;
ordering.resize(size);
// #pragma omp parallel for
for (int i = 0; i < size; ++i) {
ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i], i);
}
std::sort(ordering.begin(), ordering.end());
auto sizeof_infos = schema_->get_sizeof_infos();
std::vector<int> offset_infos(schema_->size() + 1, 0);
std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1);
std::vector<std::vector<char>> entities(schema_->size());
for (int fid = 0; fid < schema_->size(); ++fid) {
auto len = sizeof_infos[fid];
entities[fid].resize(len * size);
}
std::vector<idx_t> uids(size);
std::vector<Timestamp> timestamps(size);
// #pragma omp parallel for
for (int index = 0; index < size; ++index) {
auto [t, uid, order_index] = ordering[index];
timestamps[index] = t;
uids[index] = uid;
for (int fid = 0; fid < schema_->size(); ++fid) {
auto len = sizeof_infos[fid];
auto offset = offset_infos[fid];
auto src = raw_data + offset + order_index * len_per_row;
auto dst = entities[fid].data() + index * len;
memcpy(dst, src, len);
}
}
record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
record_.uids_.set_data(reserved_begin, uids.data(), size);
for (int fid = 0; fid < schema_->size(); ++fid) {
record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size);
}
record_.ack_responder_.AddSegment(reserved_begin, size);
return Status::OK();
// std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities));
// go.detach();
// const auto& schema = *schema_;
// auto record_ptr = GetMutableRecord();
// assert(record_ptr);
// auto& record = *record_ptr;
// auto data_chunk = ColumnBasedDataChunk::from(row_values, schema);
//
// // TODO: use shared_lock for better concurrency
// std::lock_guard lck(mutex_);
// assert(state_ == SegmentState::Open);
// auto ack_id = ack_count_.load();
// record.uids_.grow_by(primary_keys, primary_keys + size);
// for (int64_t i = 0; i < size; ++i) {
// auto key = primary_keys[i];
// auto internal_index = i + ack_id;
// internal_indexes_[key] = internal_index;
// }
// record.timestamps_.grow_by(timestamps, timestamps + size);
// for (int fid = 0; fid < schema.size(); ++fid) {
// auto field = schema[fid];
// auto total_len = field.get_sizeof() * size / sizeof(float);
// auto source_vec = data_chunk.entity_vecs[fid];
// record.entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len);
// }
//
// // finish insert
// ack_count_ += size;
// return Status::OK();
}
Status
SegmentNaive::Delete(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) {
throw std::runtime_error("unimplemented");
// for (int i = 0; i < size; ++i) {
// auto key = primary_keys[i];
// auto time = timestamps[i];
// delete_logs_.insert(std::make_pair(key, time));
// }
// return Status::OK();
}
// TODO: remove mock
Status
SegmentNaive::Query(const query::QueryPtr &query, Timestamp timestamp, QueryResult &result) {
std::shared_lock lck(mutex_);
auto ack_count = ack_count_.load();
assert(query == nullptr);
assert(schema_->size() >= 1);
const auto& field = schema_->operator[](0);
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
assert(field.get_name() == "fakevec");
auto dim = field.get_dim();
// assume query vector is [0, 0, ..., 0]
std::vector<float> query_vector(dim, 0);
auto& target_vec = entity_vecs_[0];
int current_index = -1;
float min_diff = std::numeric_limits<float>::max();
for(int index = 0; index < ack_count; ++index) {
float diff = 0;
int offset = index * dim;
for(auto d = 0; d < dim; ++d) {
auto v = target_vec[offset + d] - query_vector[d];
diff += v * v;
}
if(diff < min_diff) {
min_diff = diff;
current_index = index;
}
}
QueryResult query_result;
query_result.row_num_ = 1;
query_result.result_distances_.push_back(min_diff);
query_result.result_ids_.push_back(uids_[current_index]);
// query_result.data_chunk_ = nullptr;
result = std::move(query_result);
return Status::OK();
SegmentNaive::QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& result) {
throw std::runtime_error("unimplemented");
// auto ack_count = ack_count_.load();
// assert(query == nullptr);
// assert(schema_->size() >= 1);
// const auto& field = schema_->operator[](0);
// assert(field.get_data_type() == DataType::VECTOR_FLOAT);
// assert(field.get_name() == "fakevec");
// auto dim = field.get_dim();
// // assume query vector is [0, 0, ..., 0]
// std::vector<float> query_vector(dim, 0);
// auto& target_vec = record.entity_vecs_[0];
// int current_index = -1;
// float min_diff = std::numeric_limits<float>::max();
// for (int index = 0; index < ack_count; ++index) {
// float diff = 0;
// int offset = index * dim;
// for (auto d = 0; d < dim; ++d) {
// auto v = target_vec[offset + d] - query_vector[d];
// diff += v * v;
// }
// if (diff < min_diff) {
// min_diff = diff;
// current_index = index;
// }
// }
// QueryResult query_result;
// query_result.row_num_ = 1;
// query_result.result_distances_.push_back(min_diff);
// query_result.result_ids_.push_back(record.uids_[current_index]);
// query_result.data_chunk_ = nullptr;
// result = std::move(query_result);
// return Status::OK();
}
} // namespace milvus::engine
Status
SegmentNaive::Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& result) {
// TODO: enable delete
// TODO: enable index
auto& field = schema_->operator[](0);
assert(field.get_name() == "fakevec");
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
assert(query == nullptr);
int64_t barrier = [&]
{
auto& vec = record_.timestamps_;
int64_t beg = 0;
int64_t end = record_.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
end = mid + 1;
} else {
beg = mid;
}
}
return beg;
}();
// search until barriers
// TODO: optimize
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_[0]);
for(int64_t i = 0; i < barrier; ++i) {
// auto element =
throw std::runtime_error("unimplemented");
}
return Status::OK();
// find end of binary
// throw std::runtime_error("unimplemented");
// auto record_ptr = GetMutableRecord();
// if (record_ptr) {
// return QueryImpl(*record_ptr, query, timestamp, result);
// } else {
// assert(ready_immutable_);
// return QueryImpl(*record_immutable_, query, timestamp, result);
// }
}
Status
SegmentNaive::Close() {
state_ = SegmentState::Closed;
return Status::OK();
// auto src_record = GetMutableRecord();
// assert(src_record);
//
// auto dst_record = std::make_shared<ImmutableRecord>(schema_->size());
//
// auto data_move = [](auto& dst_vec, const auto& src_vec) {
// assert(dst_vec.size() == 0);
// dst_vec.insert(dst_vec.begin(), src_vec.begin(), src_vec.end());
// };
// data_move(dst_record->uids_, src_record->uids_);
// data_move(dst_record->timestamps_, src_record->uids_);
//
// assert(src_record->entity_vecs_.size() == schema_->size());
// assert(dst_record->entity_vecs_.size() == schema_->size());
// for (int i = 0; i < schema_->size(); ++i) {
// data_move(dst_record->entity_vecs_[i], src_record->entity_vecs_[i]);
// }
// bool ready_old = false;
// record_immutable_ = dst_record;
// ready_immutable_.compare_exchange_strong(ready_old, true);
// if (ready_old) {
// throw std::logic_error("Close may be called twice, with potential race condition");
// }
// return Status::OK();
}
Status
SegmentNaive::BuildIndex() {
throw std::runtime_error("unimplemented");
// assert(ready_immutable_);
// throw std::runtime_error("unimplemented");
}
} // namespace milvus::dog_segment

View File

@ -0,0 +1,147 @@
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_vector.h>
#include <shared_mutex>
#include "AckResponder.h"
#include "ConcurrentVector.h"
#include "dog_segment/SegmentBase.h"
// #include "knowhere/index/structured_index/StructuredIndex.h"
#include "query/GeneralQuery.h"
#include "utils/Status.h"
using idx_t = int64_t;
namespace milvus::dog_segment {
struct ColumnBasedDataChunk {
std::vector<std::vector<float>> entity_vecs;
static ColumnBasedDataChunk
from(const DogDataChunk& source, const Schema& schema) {
ColumnBasedDataChunk dest;
auto count = source.count;
auto raw_data = reinterpret_cast<const char*>(source.raw_data);
auto align = source.sizeof_per_row;
for (auto& field : schema) {
auto len = field.get_sizeof();
assert(len % sizeof(float) == 0);
std::vector<float> new_col(len * count / sizeof(float));
for (int64_t i = 0; i < count; ++i) {
memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align, len);
}
dest.entity_vecs.push_back(std::move(new_col));
// offset the raw_data
raw_data += len / sizeof(float);
}
return dest;
}
};
class SegmentNaive : public SegmentBase {
public:
virtual ~SegmentNaive() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
int64_t PreInsert(int64_t size) override;
// TODO: originally, id should be put into data_chunk
// TODO: Is it ok to put them the other side?
Status
Insert(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) override;
int64_t PreDelete(int64_t size) override;
// TODO: add id into delete log, possibly bitmap
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) override;
// query contains metadata of
Status
Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) override;
// stop receive insert requests
// will move data to immutable vector or something
Status
Close() override;
// using IndexType = knowhere::IndexType;
// using IndexMode = knowhere::IndexMode;
// using IndexConfig = knowhere::Config;
// BuildIndex With Paramaters, must with Frozen State
// NOTE: index_params contains serveral policies for several index
// TODO: currently, index has to be set at startup, and can't be modified
// AddIndex and DropIndex will be added later
Status
BuildIndex() override;
Status
DropRawData(std::string_view field_name) override {
// TODO: NO-OP
return Status::OK();
}
Status
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override {
// TODO: NO-OP
return Status::OK();
}
private:
struct MutableRecord {
ConcurrentVector<uint64_t> uids_;
tbb::concurrent_vector<Timestamp> timestamps_;
std::vector<tbb::concurrent_vector<float>> entity_vecs_;
MutableRecord(int entity_size) : entity_vecs_(entity_size) {
}
};
struct Record {
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
std::vector<std::shared_ptr<VectorBase>> entity_vec_;
Record(const Schema& schema);
};
Status
QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results);
public:
ssize_t
get_row_count() const override {
return record_.ack_responder_.GetAck();
}
SegmentState
get_state() const override {
return state_.load(std::memory_order_relaxed);
}
ssize_t
get_deleted_count() const override {
return 0;
}
public:
friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta);
explicit SegmentNaive(SchemaPtr schema, IndexMetaPtr index_meta)
: schema_(schema), index_meta_(index_meta), record_(*schema) {
}
private:
SchemaPtr schema_;
IndexMetaPtr index_meta_;
std::atomic<SegmentState> state_ = SegmentState::Open;
Record record_;
// tbb::concurrent_unordered_map<uint64_t, int> internal_indexes_;
// std::shared_ptr<MutableRecord> record_mutable_;
// // to determined that if immutable data if available
// std::shared_ptr<ImmutableRecord> record_immutable_ = nullptr;
// std::unordered_map<int, knowhere::VecIndexPtr> vec_indexings_;
// // TODO: scalar indexing
// // std::unordered_map<int, knowhere::IndexPtr> scalar_indexings_;
// tbb::concurrent_unordered_multimap<int, Timestamp> delete_logs_;
};
} // namespace milvus::dog_segment

View File

@ -30,13 +30,15 @@ DeleteSegment(CSegmentBase segment) {
int
Insert(CSegmentBase c_segment,
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count) {
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count,
unsigned long timestamp_min,
unsigned long timestamp_max) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
milvus::dog_segment::DogDataChunk dataChunk{};
@ -61,10 +63,12 @@ PreInsert(CSegmentBase c_segment, long int size) {
int
Delete(CSegmentBase c_segment,
long int reserved_offset,
long size,
const long* primary_keys,
const unsigned long* timestamps) {
long int reserved_offset,
long size,
const long* primary_keys,
const unsigned long* timestamps,
unsigned long timestamp_min,
unsigned long timestamp_max) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
auto res = segment->Delete(reserved_offset, size, primary_keys, timestamps);

View File

@ -17,23 +17,27 @@ DeleteSegment(CSegmentBase segment);
int
Insert(CSegmentBase c_segment,
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count);
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count,
unsigned long timestamp_min,
unsigned long timestamp_max);
long int
PreInsert(CSegmentBase c_segment, long int size);
int
Delete(CSegmentBase c_segment,
long int reserved_offset,
long size,
const long* primary_keys,
const unsigned long* timestamps);
long int reserved_offset,
long size,
const long* primary_keys,
const unsigned long* timestamps,
unsigned long timestamp_min,
unsigned long timestamp_max);
long int
PreDelete(CSegmentBase c_segment, long int size);

View File

@ -1,8 +1,9 @@
enable_testing()
find_package(GTest REQUIRED)
set(MILVUS_TEST_FILES
test_dog_segment.cpp
test_c_api.cpp
test_naive.cpp
# test_dog_segment.cpp
# test_c_api.cpp
)
add_executable(all_tests
${MILVUS_TEST_FILES}

View File

@ -0,0 +1,7 @@
#include <gtest/gtest.h>
TEST(TestNaive, Naive) {
EXPECT_TRUE(true);
}

View File

@ -236,7 +236,6 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
var numOfRecords = len(node.insertData.insertRecords[segmentID])
var offset = targetSegment.SegmentPreInsert(numOfRecords)
node.insertData.insertOffset[segmentID] = offset
@ -255,7 +254,6 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
var numOfRecords = len(node.deleteData.deleteIDs[segmentID])
var offset = targetSegment.SegmentPreDelete(numOfRecords)
node.deleteData.deleteOffset[segmentID] = offset
@ -289,12 +287,9 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
ids := node.insertData.insertIDs[segmentID]
timestamps := node.insertData.insertTimestamps[segmentID]
offsets := node.insertData.insertOffset[segmentID]
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, records)
err = targetSegment.SegmentInsert(&ids, &timestamps, records)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -310,10 +305,7 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
offset := node.deleteData.deleteOffset[segmentID]
err = segment.SegmentDelete(offset, deleteIDs, deleteTimestamps)
err = segment.SegmentDelete(deleteIDs, deleteTimestamps)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}

View File

@ -27,8 +27,8 @@ const (
)
type Segment struct {
SegmentPtr C.CSegmentBase
SegmentId int64
SegmentPtr C.CSegmentBase
SegmentId int64
SegmentCloseTime uint64
}
@ -77,58 +77,54 @@ func (s *Segment) Close() error {
////////////////////////////////////////////////////////////////////////////
func (s *Segment) SegmentPreInsert(numOfRecords int) int64 {
/*C.PreInsert
long int
PreInsert(CSegmentBase c_segment, long int size);
*/
var offset = C.PreInsert(C.long(int64(numOfRecords)))
var offset = C.PreInsert(numOfRecords)
return offset
}
func (s *Segment) SegmentPreDelete(numOfRecords int) int64 {
/*C.PreDelete
long int
PreDelete(CSegmentBase c_segment, long int size);
*/
var offset = C.PreDelete(C.long(int64(numOfRecords)))
var offset = C.PreDelete(numOfRecords)
return offset
}
func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error {
func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte) error {
/*C.Insert
int
Insert(CSegmentBase c_segment,
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count);
signed long int count,
unsigned long timestamp_min,
unsigned long timestamp_max);
*/
// Blobs to one big blob
var rawData []byte
var rowData []byte
for i := 0; i < len(*records); i++ {
copy(rawData, (*records)[i])
copy(rowData, (*records)[i])
}
var cOffset = C.long(offset)
var cNumOfRows = C.long(len(*entityIDs))
var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0])
var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
var cSizeofPerRow = C.int(len((*records)[0]))
var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])
// TODO: remove hard code schema
// auto schema_tmp = std::make_shared<Schema>();
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
// TODO: remove hard code & fake dataChunk
const DIM = 4
const N = 3
var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4}
var rawData []int8
for i := 0; i <= N; i++ {
for _, ele := range vec {
rawData=append(rawData, int8(ele))
}
rawData=append(rawData, int8(i))
}
const sizeofPerRow = 4 + DIM * 4
var status = C.Insert(s.SegmentPtr,
cOffset,
cNumOfRows,
cEntityIdsPtr,
cTimestampsPtr,
cRawDataVoidPtr,
cSizeofPerRow,
cNumOfRows)
var status = C.Insert(s.SegmentPtr, C.long(N), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N), C.ulong(timestampMin), C.ulong(timestampMax))
if status != 0 {
return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
@ -137,21 +133,19 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
return nil
}
func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error {
func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error {
/*C.Delete
int
Delete(CSegmentBase c_segment,
long int reserved_offset,
long size,
const long* primary_keys,
const unsigned long* timestamps);
const unsigned long* primary_keys,
const unsigned long* timestamps,
unsigned long timestamp_min,
unsigned long timestamp_max);
*/
var cOffset = C.long(offset)
var cSize = C.long(len(*entityIDs))
var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0])
var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
size := len(*entityIds)
var status = C.Delete(s.SegmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
var status = C.Delete(s.SegmentPtr, C.long(size), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), C.ulong(timestampMin), C.ulong(timestampMax))
if status != 0 {
return errors.New("Delete failed, error code = " + strconv.Itoa(int(status)))
@ -175,13 +169,7 @@ func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorReco
resultIds := make([]int64, TopK)
resultDistances := make([]float32, TopK)
var cQueryPtr = unsafe.Pointer(nil)
var cTimestamp = C.ulong(timestamp)
var cResultIds = (*C.long)(&resultIds[0])
var cResultDistances = (*C.float)(&resultDistances[0])
var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances)
var status = C.Search(s.SegmentPtr, unsafe.Pointer(nil), C.ulong(timestamp), (*C.long)(&resultIds[0]), (*C.float)(&resultDistances[0]))
if status != 0 {
return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))
}

View File

@ -7,270 +7,135 @@ import (
)
func TestConstructorAndDestructor(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_SegmentInsert(t *testing.T) {
// 1. Construct node, collection, and segment
func TestSegmentInsert(t *testing.T) {
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 4
const N = 3
var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4}
var rawData []byte
for _, ele := range vec {
rawData=append(rawData, byte(ele))
}
rawData=append(rawData, byte(1))
var records [][]byte
for i:= 0; i < N; i++ {
records = append(records, rawData)
}
// 4. Do PreInsert
var offset = segment.SegmentPreInsert(N)
assert.Greater(t, offset, 0)
// 5. Do Insert
var err = segment.SegmentInsert(offset, &ids, &timestamps, &records)
var err = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, err)
// 6. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_SegmentDelete(t *testing.T) {
// 1. Construct node, collection, and segment
func TestSegmentDelete(t *testing.T) {
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
// 3. Do PreDelete
var offset = segment.SegmentPreDelete(10)
assert.Greater(t, offset, 0)
// 4. Do Delete
var err = segment.SegmentDelete(offset, &ids, &timestamps)
var err = segment.SegmentDelete(&ids, &timestamps)
assert.NoError(t, err)
// 5. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_SegmentSearch(t *testing.T) {
// 1. Construct node, collection, and segment
func TestSegmentSearch(t *testing.T) {
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 4
const N = 3
var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4}
var rawData []byte
for _, ele := range vec {
rawData=append(rawData, byte(ele))
}
rawData=append(rawData, byte(1))
var records [][]byte
for i:= 0; i < N; i++ {
records = append(records, rawData)
}
var insertErr = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, insertErr)
// 4. Do PreInsert
var offset = segment.SegmentPreInsert(N)
assert.Greater(t, offset, 0)
// 5. Do Insert
var err = segment.SegmentInsert(offset, &ids, &timestamps, &records)
assert.NoError(t, err)
// 6. Do search
var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil)
assert.NoError(t, searchErr)
fmt.Println(searchRes)
// 7. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_SegmentPreInsert(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Do PreInsert
var offset = segment.SegmentPreInsert(10)
assert.Greater(t, offset, 0)
// 3. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_SegmentPreDelete(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Do PreDelete
var offset = segment.SegmentPreDelete(10)
assert.Greater(t, offset, 0)
// 3. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
// Segment util functions test
////////////////////////////////////////////////////////////////////////////
func TestSegment_GetStatus(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Get segment status
var status = segment.GetStatus()
assert.Equal(t, status, SegmentOpened)
// 3. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_Close(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Close segment
var err = segment.Close()
assert.NoError(t, err)
// 3. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_GetRowCount(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 4
const N = 3
var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4}
var rawData []byte
for _, ele := range vec {
rawData=append(rawData, byte(ele))
}
rawData=append(rawData, byte(1))
var records [][]byte
for i:= 0; i < N; i++ {
records = append(records, rawData)
}
// 4. Do PreInsert
var offset = segment.SegmentPreInsert(N)
assert.Greater(t, offset, 0)
// 5. Do Insert
var err = segment.SegmentInsert(offset, &ids, &timestamps, &records)
var err = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, err)
// 6. Get segment row count
var rowCount = segment.GetRowCount()
assert.Equal(t, rowCount, int64(len(ids)))
// 7. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_GetDeletedCount(t *testing.T) {
// 1. Construct node, collection, and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
// 3. Do PreDelete
var offset = segment.SegmentPreDelete(10)
assert.Greater(t, offset, 0)
// 4. Do Delete
var err = segment.SegmentDelete(offset, &ids, &timestamps)
var err = segment.SegmentDelete(&ids, &timestamps)
assert.NoError(t, err)
// 5. Get segment deleted count
var deletedCount = segment.GetDeletedCount()
// TODO: assert.Equal(t, deletedCount, len(ids))
assert.Equal(t, deletedCount, int64(0))
// 6. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)