From 3f10cddf3e1634d35fbfeb7961f2511231419ef4 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Wed, 9 Sep 2020 12:02:52 +0800 Subject: [PATCH] Fix gtest, support AckResponder, update segment interface Signed-off-by: FluorineDog --- core/cmake/FindGTest.cmake | 89 +++-- core/src/dog_segment/AckResponder.h | 41 ++ core/src/dog_segment/CMakeLists.txt | 15 +- core/src/dog_segment/Collection.h | 64 ++-- core/src/dog_segment/ConcurrentVector.cpp | 8 + core/src/dog_segment/ConcurrentVector.h | 204 ++++++++++ core/src/dog_segment/IndexMeta.cpp | 56 +++ core/src/dog_segment/IndexMeta.h | 57 +++ core/src/dog_segment/SegmentBase.h | 13 +- core/src/dog_segment/SegmentDefs.h | 62 +-- core/src/dog_segment/SegmentNaive.cpp | 447 +++++++++++----------- core/src/dog_segment/SegmentNaive.h | 147 +++++++ core/src/dog_segment/segment_c.cpp | 26 +- core/src/dog_segment/segment_c.h | 26 +- core/unittest/CMakeLists.txt | 5 +- core/unittest/test_naive.cpp | 7 + reader/query_node.go | 12 +- reader/segment.go | 84 ++-- reader/segment_test.go | 153 +------- 19 files changed, 965 insertions(+), 551 deletions(-) create mode 100644 core/src/dog_segment/AckResponder.h create mode 100644 core/src/dog_segment/ConcurrentVector.cpp create mode 100644 core/src/dog_segment/ConcurrentVector.h create mode 100644 core/src/dog_segment/IndexMeta.cpp create mode 100644 core/src/dog_segment/IndexMeta.h create mode 100644 core/src/dog_segment/SegmentNaive.h create mode 100644 core/unittest/test_naive.cpp diff --git a/core/cmake/FindGTest.cmake b/core/cmake/FindGTest.cmake index 58a899b8b1..726e737e71 100644 --- a/core/cmake/FindGTest.cmake +++ b/core/cmake/FindGTest.cmake @@ -1,45 +1,58 @@ -########################### GTEST -# Enable ExternalProject CMake module -INCLUDE(ExternalProject) +find_package(Threads REQUIRED) -# Set default ExternalProject root directory -SET_DIRECTORY_PROPERTIES(PROPERTIES EP_PREFIX ${CMAKE_BINARY_DIR}/third_party) - -# Add gtest -# http://stackoverflow.com/questions/9689183/cmake-googletest +include(ExternalProject) ExternalProject_Add( - googletest - URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip - # TIMEOUT 10 - # # Force separate output paths for debug and release builds to allow easy - # # identification of correct lib in subsequent TARGET_LINK_LIBRARIES commands - # CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs - # -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_RELEASE:PATH=ReleaseLibs - # -Dgtest_force_shared_crt=ON - # Disable install step - INSTALL_COMMAND "" - # Wrap download, configure and build steps in a script to log output - LOG_DOWNLOAD ON - LOG_CONFIGURE ON - LOG_BUILD ON) + googletest + URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip + UPDATE_COMMAND "" + INSTALL_COMMAND "" + LOG_DOWNLOAD ON + LOG_CONFIGURE ON + LOG_BUILD ON) -# Specify include dir ExternalProject_Get_Property(googletest source_dir) -set(GTEST_INCLUDE_DIR ${source_dir}/include) +set(GTEST_INCLUDE_DIRS ${source_dir}/googletest/include) +set(GMOCK_INCLUDE_DIRS ${source_dir}/googlemock/include) + +# The cloning of the above repo doesn't happen until make, however if the dir doesn't +# exist, INTERFACE_INCLUDE_DIRECTORIES will throw an error. +# To make it work, we just create the directory now during config. +file(MAKE_DIRECTORY ${GTEST_INCLUDE_DIRS}) +file(MAKE_DIRECTORY ${GMOCK_INCLUDE_DIRS}) -# Library ExternalProject_Get_Property(googletest binary_dir) +set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a) +set(GTEST_LIBRARY gtest) +add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED) +set_target_properties(${GTEST_LIBRARY} PROPERTIES + "IMPORTED_LOCATION" "${GTEST_LIBRARY_PATH}" + "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}" + "INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}") +add_dependencies(${GTEST_LIBRARY} googletest) -# set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a) -# set(GTEST_LIBRARY gtest) -# add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED) -# set_property(TARGET ${GTEST_LIBRARY} PROPERTY IMPORTED_LOCATION -# ${GTEST_LIBRARY_PATH} ) -# add_dependencies(${GTEST_LIBRARY} googletest) -set(GTEST_LIBRARY_PATH ${binary_dir}/lib) -add_library(gtest UNKNOWN IMPORTED) -add_library(gtest_main UNKNOWN IMPORTED) -set_property(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest.a) -set_property(TARGET gtest_main PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest_main.a) -add_dependencies(gtest googletest) -add_dependencies(gtest_main googletest) +set(GTEST_MAIN_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest_main.a) +set(GTEST_MAIN_LIBRARY gtest_main) +add_library(${GTEST_MAIN_LIBRARY} UNKNOWN IMPORTED) +set_target_properties(${GTEST_MAIN_LIBRARY} PROPERTIES + "IMPORTED_LOCATION" "${GTEST_MAIN_LIBRARY_PATH}" + "IMPORTED_LINK_INTERFACE_LIBRARImS" "${CMAKE_THREAD_LIBS_INIT}" + "INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}") +add_dependencies(${GTEST_MAIN_LIBRARY} googletest) + +# set(GMOCK_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock.a) +# set(GMOCK_LIBRARY gmock) +# add_library(${GMOCK_LIBRARY} UNKNOWN IMPORTED) +# set_target_properties(${GMOCK_LIBRARY} PROPERTIES +# "IMPORTED_LOCATION" "${GMOCK_LIBRARY_PATH}" +# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}" +# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}") +# add_dependencies(${GMOCK_LIBRARY} googletest) + +# set(GMOCK_MAIN_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock_main.a) +# set(GMOCK_MAIN_LIBRARY gmock_main) +# add_library(${GMOCK_MAIN_LIBRARY} UNKNOWN IMPORTED) +# set_target_properties(${GMOCK_MAIN_LIBRARY} PROPERTIES +# "IMPORTED_LOCATION" "${GMOCK_MAIN_LIBRARY_PATH}" +# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}" +# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}") +# add_dependencies(${GMOCK_MAIN_LIBRARY} ${GTEST_LIBRARY}) \ No newline at end of file diff --git a/core/src/dog_segment/AckResponder.h b/core/src/dog_segment/AckResponder.h new file mode 100644 index 0000000000..f16927a9ef --- /dev/null +++ b/core/src/dog_segment/AckResponder.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include +#include +#include +namespace milvus::dog_segment { +class AckResponder { + public: + void + AddSegment(int64_t seg_begin, int64_t seg_end) { + std::lock_guard lck(mutex_); + fetch_and_flip(seg_end); + auto old_begin = fetch_and_flip(seg_begin); + if(old_begin) { + minimal = *acks_.begin(); + } + } + + int64_t + GetAck() const{ + return minimal; + } + + private: + bool + fetch_and_flip(int64_t endpoint) { + if (acks_.count(endpoint)) { + acks_.erase(endpoint); + return true; + } else { + acks_.insert(endpoint); + return false; + } + } + + private: + std::shared_mutex mutex_; + std::set acks_ = {0}; + std::atomic minimal = 0; +}; +} diff --git a/core/src/dog_segment/CMakeLists.txt b/core/src/dog_segment/CMakeLists.txt index 41b0f4be21..fc1e61cddf 100644 --- a/core/src/dog_segment/CMakeLists.txt +++ b/core/src/dog_segment/CMakeLists.txt @@ -1,16 +1,17 @@ set(DOG_SEGMENT_FILES SegmentNaive.cpp - Collection.cpp - Partition.cpp - collection_c.cpp - partition_c.cpp - segment_c.cpp + IndexMeta.cpp + ConcurrentVector.cpp + # Collection.cpp + # Partition.cpp + # collection_c.cpp + # partition_c.cpp + # segment_c.cpp ) -# Third Party dablooms file -#aux_source_directory( ${MILVUS_THIRDPARTY_SRC}/dablooms THIRDPARTY_DABLOOMS_FILES ) add_library(milvus_dog_segment SHARED ${DOG_SEGMENT_FILES} ) #add_dependencies( segment sqlite mysqlpp ) target_link_libraries(milvus_dog_segment tbb milvus_utils pthread) + diff --git a/core/src/dog_segment/Collection.h b/core/src/dog_segment/Collection.h index c538264599..d7973e25cc 100644 --- a/core/src/dog_segment/Collection.h +++ b/core/src/dog_segment/Collection.h @@ -1,37 +1,51 @@ #pragma once -#include "dog_segment/Partition.h" #include "SegmentDefs.h" -namespace milvus::dog_segment { +////////////////////////////////////////////////////////////////// -class Collection { +class Partition { public: - explicit Collection(std::string &collection_name, std::string &schema); - - // TODO: set index - void set_index(); - - // TODO: config to schema - void parse(); - -public: - SchemaPtr& get_schema() { - return schema_; - } - - std::string& get_collection_name() { - return collection_name_; + const std::deque& segments() const { + return segments_; } private: - // TODO: add Index ptr - // IndexPtr index_ = nullptr; - std::string collection_name_; - std::string schema_json_; - SchemaPtr schema_; + std::string name_; + std::deque segments_; }; -using CollectionPtr = std::unique_ptr; +using PartitionPtr = std::shard_ptr; -} +////////////////////////////////////////////////////////////////// + +class Collection { +public: + explicit Collection(std::string name): name_(name){} + + // TODO: set index + set_index() {} + + set_schema(std::string config) { + // TODO: config to schema + schema_ = null; + } + +public: +// std::vector Insert() { +// for (auto partition: partitions_) { +// for (auto segment: partition.segments()) { +// if (segment.Status == Status.open) { +// segment.Insert() +// } +// } +// } +// } + +private: + // TODO: Index ptr + IndexPtr index_ = nullptr; + std::string name_; + SchemaPtr schema_; + std::vector partitions_; +}; diff --git a/core/src/dog_segment/ConcurrentVector.cpp b/core/src/dog_segment/ConcurrentVector.cpp new file mode 100644 index 0000000000..d9e7f7bc9d --- /dev/null +++ b/core/src/dog_segment/ConcurrentVector.cpp @@ -0,0 +1,8 @@ + +#include +#include "dog_segment/ConcurrentVector.h" + +namespace milvus::dog_segment { + +} + diff --git a/core/src/dog_segment/ConcurrentVector.h b/core/src/dog_segment/ConcurrentVector.h new file mode 100644 index 0000000000..f87b8fee9a --- /dev/null +++ b/core/src/dog_segment/ConcurrentVector.h @@ -0,0 +1,204 @@ +#pragma once +#include + +#include +#include +#include +#include +#include +#include +namespace milvus::dog_segment { + +// we don't use std::array because capacity of concurrent_vector wastes too much memory +// template +// class FixedVector : public std::vector { +// public: +// // This is a stupid workaround for tbb API to avoid memory copy +// explicit FixedVector(int64_t size) : placeholder_size_(size) { +// } +// FixedVector(const FixedVector& placeholder_vec) +// : std::vector(placeholder_vec.placeholder_size_), is_placeholder_(false) { +// // assert(placeholder_vec.is_placeholder_); +// } +// FixedVector(FixedVector&&) = delete; +// +// FixedVector& +// operator=(FixedVector&&) = delete; +// +// FixedVector& +// operator=(const FixedVector&) = delete; +// +// bool is_placeholder() { +// return is_placeholder_; +// } +// private: +// bool is_placeholder_ = true; +// int placeholder_size_ = 0; +//}; + +template +using FixedVector = std::vector; + +template +class ThreadSafeVector { + public: + template + void + emplace_to_at_least(int64_t size, Args... args) { + if (size <= size_) { + return; + } + // TODO: use multithread to speedup + std::lock_guard lck(mutex_); + while (vec_.size() < size) { + vec_.emplace_back(std::forward(args...)); + ++size_; + } + } + const Type& + operator[](int64_t index) const { + assert(index < size_); + std::shared_lock lck(mutex_); + return vec_[index]; + } + + Type& + operator[](int64_t index) { + assert(index < size_); + std::shared_lock lck(mutex_); + return vec_[index]; + } + + int64_t + size() const { + return size_; + } + + private: + std::atomic size_ = 0; + std::deque vec_; + mutable std::shared_mutex mutex_; +}; + +class VectorBase { + public: + VectorBase() = default; + virtual ~VectorBase() = default; + + virtual void + grow_to_at_least(int64_t element_count) = 0; + + virtual void set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) = 0; +}; + +template +class ConcurrentVector : public VectorBase { + public: + // constants + using Chunk = FixedVector; + ConcurrentVector(ConcurrentVector&&) = delete; + ConcurrentVector(const ConcurrentVector&) = delete; + + ConcurrentVector& operator=(ConcurrentVector&&) = delete; + ConcurrentVector& operator=(const ConcurrentVector&) = delete; + public: + + explicit ConcurrentVector(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) { + assert(is_scalar ? dim == 1 : dim != 1); + } + + void + grow_to_at_least(int64_t element_count) override { + auto chunk_count = (element_count + ElementsPerChunk - 1) / ElementsPerChunk; + chunks_.emplace_to_at_least(chunk_count, SizePerChunk); + } + + void + set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) override { + set_data(element_count, static_cast(source), element_count); + } + + void + set_data(ssize_t element_offset, const Type* source, ssize_t element_count) { + if (element_count == 0) { + return; + } + this->grow_to_at_least(element_offset + element_count); + auto chunk_id = element_offset / ElementsPerChunk; + auto chunk_offset = element_offset % ElementsPerChunk; + ssize_t source_offset = 0; + // first partition: + if (chunk_offset + element_count <= ElementsPerChunk) { + // only first + fill_chunk(chunk_id, chunk_offset, element_count, source, source_offset); + return; + } + + auto first_size = ElementsPerChunk - chunk_offset; + fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); + + source_offset += ElementsPerChunk - chunk_offset; + element_count -= first_size; + ++chunk_id; + + // the middle + while (element_count >= ElementsPerChunk) { + fill_chunk(chunk_id, 0, ElementsPerChunk, source, source_offset); + source_offset += ElementsPerChunk; + element_count -= ElementsPerChunk; + ++chunk_id; + } + + // the final + if (element_count > 0) { + fill_chunk(chunk_id, 0, element_count, source, source_offset); + } + } + + const Chunk& + get_chunk(ssize_t chunk_index) const { + return chunks_[chunk_index]; + } + + // just for fun, don't use it directly + const Type* + get_element(ssize_t element_index) const { + auto chunk_id = element_index / ElementsPerChunk; + auto chunk_offset = element_index % ElementsPerChunk; + return get_chunk(chunk_id).data() + chunk_offset * Dim; + } + + const Type& + operator[](ssize_t element_index) const { + assert(Dim == 1); + auto chunk_id = element_index / ElementsPerChunk; + auto chunk_offset = element_index % ElementsPerChunk; + return get_chunk(chunk_id)[chunk_offset]; + } + + ssize_t + chunk_size() const { + return chunks_.size(); + } + + private: + void + fill_chunk(ssize_t chunk_id, ssize_t chunk_offset, ssize_t element_count, const Type* source, + ssize_t source_offset) { + if (element_count <= 0) { + return; + } + auto chunk_max_size = chunks_.size(); + assert(chunk_id < chunk_max_size); + Chunk& chunk = chunks_[chunk_id]; + auto ptr = chunk.data(); + std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim); + } + + const ssize_t Dim; + const ssize_t SizePerChunk; + private: + ThreadSafeVector chunks_; +}; + +} // namespace milvus::dog_segment \ No newline at end of file diff --git a/core/src/dog_segment/IndexMeta.cpp b/core/src/dog_segment/IndexMeta.cpp new file mode 100644 index 0000000000..fbb05f8545 --- /dev/null +++ b/core/src/dog_segment/IndexMeta.cpp @@ -0,0 +1,56 @@ +// #include "IndexMeta.h" +// #include +// #include +// namespace milvus::dog_segment { +// +// Status +// IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name, IndexType type, IndexMode mode, +// IndexConfig config) { +// Entry entry{ +// index_name, +// field_name, +// type, +// mode, +// std::move(config) +// }; +// VerifyEntry(entry); +// +// if (entries_.count(index_name)) { +// throw std::invalid_argument("duplicate index_name"); +// } +// // TODO: support multiple indexes for single field +// assert(!lookups_.count(field_name)); +// lookups_[field_name] = index_name; +// entries_[index_name] = std::move(entry); +// +// return Status::OK(); +// } +// +// Status +// IndexMeta::DropEntry(const std::string& index_name) { +// assert(entries_.count(index_name)); +// auto entry = std::move(entries_[index_name]); +// if(lookups_[entry.field_name] == index_name) { +// lookups_.erase(entry.field_name); +// } +// return Status::OK(); +// } +// +// void IndexMeta::VerifyEntry(const Entry &entry) { +// auto is_mode_valid = std::set{IndexMode::MODE_CPU, IndexMode::MODE_GPU}.count(entry.mode); +// if(!is_mode_valid) { +// throw std::invalid_argument("invalid mode"); +// } +// +// auto& schema = *schema_; +// auto& field_meta = schema[entry.index_name]; +// // TODO checking +// if(field_meta.is_vector()) { +// assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT); +// } else { +// assert(false); +// } +// } +// +// } // namespace milvus::dog_segment +// \ No newline at end of file diff --git a/core/src/dog_segment/IndexMeta.h b/core/src/dog_segment/IndexMeta.h new file mode 100644 index 0000000000..3c89b60463 --- /dev/null +++ b/core/src/dog_segment/IndexMeta.h @@ -0,0 +1,57 @@ +#pragma once +// +//#include +// +//#include "SegmentDefs.h" +//#include "knowhere/index/IndexType.h" +// +#include +class IndexMeta; +namespace milvus::dog_segment { +//// TODO: this is +//class IndexMeta { +// public: +// IndexMeta(SchemaPtr schema) : schema_(schema) { +// } +// using IndexType = knowhere::IndexType; +// using IndexMode = knowhere::IndexMode; +// using IndexConfig = knowhere::Config; +// +// struct Entry { +// std::string index_name; +// std::string field_name; +// IndexType type; +// IndexMode mode; +// IndexConfig config; +// }; +// +// Status +// AddEntry(const std::string& index_name, const std::string& field_name, IndexType type, IndexMode mode, +// IndexConfig config); +// +// Status +// DropEntry(const std::string& index_name); +// +// const std::map& +// get_entries() { +// return entries_; +// } +// +// const Entry& lookup_by_field(const std::string& field_name) { +// auto index_name = lookups_.at(field_name); +// return entries_.at(index_name); +// } +// private: +// void +// VerifyEntry(const Entry& entry); +// +// private: +// SchemaPtr schema_; +// std::map entries_; // index_name => Entry +// std::map lookups_; // field_name => index_name +//}; +// +using IndexMetaPtr = std::shared_ptr; +// +} // namespace milvus::dog_segment +// \ No newline at end of file diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index d1cf71fb39..c916f6e9b5 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -1,19 +1,20 @@ #pragma once #include -// #include "db/Types.h" +#include "IndexMeta.h" +#include "utils/Types.h" #include "dog_segment/SegmentDefs.h" // #include "knowhere/index/Index.h" +// #include "knowhere/index/IndexType.h" #include "query/GeneralQuery.h" -using idx_t = int64_t; namespace milvus { namespace dog_segment { -using engine::DataChunk; -using engine::DataChunkPtr; +// using engine::DataChunk; +// using engine::DataChunkPtr; using engine::QueryResult; -using DogDataChunkPtr = std::shared_ptr; +// using DogDataChunkPtr = std::shared_ptr; int TestABI(); @@ -87,5 +88,5 @@ using SegmentBasePtr = std::unique_ptr; SegmentBasePtr CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta); -} // namespace engine +} // namespace dog_segment } // namespace milvus diff --git a/core/src/dog_segment/SegmentDefs.h b/core/src/dog_segment/SegmentDefs.h index d1b56d4356..9a5600e626 100644 --- a/core/src/dog_segment/SegmentDefs.h +++ b/core/src/dog_segment/SegmentDefs.h @@ -1,16 +1,13 @@ #pragma once #include -#include -// #include "db/Types.h" +#include "utils/Types.h" // #include "knowhere/index/Index.h" #include "utils/Status.h" -#include "utils/Types.h" -#include -using Timestamp = uint64_t; // TODO: use TiKV-like timestamp namespace milvus::dog_segment { +using Timestamp = uint64_t; // TODO: use TiKV-like timestamp using engine::DataType; using engine::FieldElementType; @@ -20,11 +17,6 @@ struct DogDataChunk { int64_t count; }; -struct IndexConfig { - // TODO - // std::unordered_map configs; -}; - inline int field_sizeof(DataType data_type, int dim = 1) { switch (data_type) { @@ -49,12 +41,17 @@ field_sizeof(DataType data_type, int dim = 1) { return dim / 8; } default: { - assert(false); + throw std::invalid_argument("unsupported data type"); return 0; } } } +inline bool +field_is_vector(DataType datatype) { + return datatype == DataType::VECTOR_BINARY || datatype == DataType::VECTOR_FLOAT; +} + struct FieldMeta { public: FieldMeta(std::string_view name, DataType type, int dim = 1) : name_(name), type_(type), dim_(dim) { @@ -107,10 +104,12 @@ class Schema { void AddField(FieldMeta field_meta) { - auto index = fields_.size(); + auto offset = fields_.size(); fields_.emplace_back(field_meta); - indexes_.emplace(field_meta.get_name(), index); - total_sizeof_ = field_meta.get_sizeof(); + offsets_.emplace(field_meta.get_name(), offset); + auto field_sizeof = field_meta.get_sizeof(); + sizeof_infos_.push_back(field_sizeof); + total_sizeof_ += field_sizeof; } auto @@ -132,7 +131,8 @@ class Schema { return fields_.end(); } - int size() const { + int + size() const { return fields_.size(); } @@ -141,12 +141,22 @@ class Schema { return fields_[field_index]; } + auto + get_total_sizeof() const { + return total_sizeof_; + } + + const std::vector& get_sizeof_infos() { + return sizeof_infos_; + } + + const FieldMeta& operator[](const std::string& field_name) const { - auto index_iter = indexes_.find(field_name); - assert(index_iter != indexes_.end()); - auto index = index_iter->second; - return (*this)[index]; + auto offset_iter = offsets_.find(field_name); + assert(offset_iter != offsets_.end()); + auto offset = offset_iter->second; + return (*this)[offset]; } private: @@ -155,19 +165,11 @@ class Schema { private: // a mapping for random access - std::unordered_map indexes_; - int total_sizeof_; + std::unordered_map offsets_; + std::vector sizeof_infos_; + int total_sizeof_ = 0; }; using SchemaPtr = std::shared_ptr; -class IndexData { - public: - virtual std::vector - serilize() = 0; - - static std::shared_ptr - deserialize(int64_t size, const char* blob); -}; - } // namespace milvus::dog_segment diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index c245bbe67c..dcabc6efa4 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -1,246 +1,255 @@ -#include +#include -#include "dog_segment/SegmentBase.h" -#include "utils/Status.h" -#include -#include -#include +#include +#include +#include namespace milvus::dog_segment { - int TestABI() { return 42; } -struct ColumnBasedDataChunk { - std::vector> entity_vecs; - static ColumnBasedDataChunk from(const DogDataChunk& source, const Schema& schema){ - ColumnBasedDataChunk dest; - auto count = source.count; - auto raw_data = reinterpret_cast(source.raw_data); - auto align = source.sizeof_per_row; - for(auto& field: schema) { - auto len = field.get_sizeof(); - assert(len % sizeof(float) == 0); - std::vector new_col(len * count / sizeof(float)); - for(int64_t i = 0; i < count; ++i) { - memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align , len); - } - dest.entity_vecs.push_back(std::move(new_col)); - // offset the raw_data - raw_data += len / sizeof(float); - } - return dest; - } -}; - - -class SegmentNaive : public SegmentBase { - public: - virtual ~SegmentNaive() = default; - // SegmentBase(std::shared_ptr collection); - - // TODO: originally, id should be put into data_chunk - // TODO: Is it ok to put them the other side? - Status - Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, - const DogDataChunk& values) override; - - // TODO: add id into delete log, possibly bitmap - Status - Delete(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) override; - - // query contains metadata of - Status - Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) override; - - // // THIS FUNCTION IS REMOVED - // virtual Status - // GetEntityByIds(Timestamp timestamp, const std::vector& ids, DataChunkPtr& results) = 0; - - // stop receive insert requests - Status - Close() override { - std::lock_guard lck(mutex_); - assert(state_ == SegmentState::Open); - state_ = SegmentState::Closed; - return Status::OK(); - } - - // // to make all data inserted visible - // // maybe a no-op? - // virtual Status - // Flush(Timestamp timestamp) = 0; - - // BuildIndex With Paramaters, must with Frozen State - // This function is atomic - // NOTE: index_params contains serveral policies for several index - Status - BuildIndex(std::shared_ptr index_params) override { - throw std::runtime_error("not implemented"); - } - - // Remove Index - Status - DropIndex(std::string_view field_name) override { - throw std::runtime_error("not implemented"); - } - - Status - DropRawData(std::string_view field_name) override { - // TODO: NO-OP - return Status::OK(); - } - - Status - LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override { - // TODO: NO-OP - return Status::OK(); - } - - public: - ssize_t - get_row_count() const override { - return ack_count_.load(std::memory_order_relaxed); - } - - // const FieldsInfo& - // get_fields_info() const override { - // - // } - // - // // check is_indexed here - // virtual const IndexConfig& - // get_index_param() const = 0; - // - SegmentState - get_state() const override { - return state_.load(std::memory_order_relaxed); - } - // - // std::shared_ptr - // get_index_data(); - - ssize_t - get_deleted_count() const override { - return 0; - } - - public: - friend SegmentBasePtr - CreateSegment(SchemaPtr& schema); - - private: - SchemaPtr schema_; - std::shared_mutex mutex_; - std::atomic state_ = SegmentState::Open; - std::atomic ack_count_ = 0; - tbb::concurrent_vector uids_; - tbb::concurrent_vector timestamps_; - std::vector> entity_vecs_; - tbb::concurrent_unordered_map internal_indexes_; - - tbb::concurrent_unordered_multimap delete_logs_; -}; - -SegmentBasePtr -CreateSegment(SchemaPtr& schema) { - // TODO: remove hard code - auto schema_tmp = std::make_shared(); - schema_tmp->AddField("fakevec", DataType::VECTOR_FLOAT, 16); - schema_tmp->AddField("age", DataType::INT32); - - auto segment = std::make_unique(); - segment->schema_ = schema_tmp; - segment->entity_vecs_.resize(schema_tmp->size()); +std::unique_ptr +CreateSegment(SchemaPtr schema, IndexMetaPtr remote_index_meta) { + auto segment = std::make_unique(schema, remote_index_meta); return segment; } -Status -SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, - const DogDataChunk& row_values) { - const auto& schema = *schema_; - auto data_chunk = ColumnBasedDataChunk::from(row_values, schema); - - // insert datas - // TODO: use shared_lock - std::lock_guard lck(mutex_); - assert(state_ == SegmentState::Open); - auto ack_id = ack_count_.load(); - uids_.grow_by(primary_keys, primary_keys + size); - for(int64_t i = 0; i < size; ++i) { - auto key = primary_keys[i]; - auto internal_index = i + ack_id; - internal_indexes_[key] = internal_index; +SegmentNaive::Record::Record(const Schema& schema) : uids_(1), timestamps_(1) { + for (auto& field : schema) { + if (field.is_vector()) { + assert(field.get_data_type() == DataType::VECTOR_FLOAT); + entity_vec_.emplace_back(std::make_shared>(field.get_dim())); + } else { + assert(field.get_data_type() == DataType::INT32); + entity_vec_.emplace_back(std::make_shared>()); + } } - timestamps_.grow_by(timestamps, timestamps + size); - for(int fid = 0; fid < schema.size(); ++fid) { - auto field = schema[fid]; - auto total_len = field.get_sizeof() * size / sizeof(float); - auto source_vec = data_chunk.entity_vecs[fid]; - entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len); - } - - // finish insert - ack_count_ += size; - return Status::OK(); } -Status SegmentNaive::Delete(int64_t size, const uint64_t *primary_keys, const Timestamp *timestamps) { - for(int i = 0; i < size; ++i) { - auto key = primary_keys[i]; - auto time = timestamps[i]; - delete_logs_.insert(std::make_pair(key, time)); +int64_t +SegmentNaive::PreInsert(int64_t size) { + auto reserved_begin = record_.reserved.fetch_add(size); + return reserved_begin; +} + +int64_t +SegmentNaive::PreDelete(int64_t size) { + throw std::runtime_error("unimplemented"); +} + +Status +SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t* uids_raw, const Timestamp* timestamps_raw, + const DogDataChunk& entities_raw) { + assert(entities_raw.count == size); + assert(entities_raw.sizeof_per_row == schema_->get_total_sizeof()); + auto raw_data = reinterpret_cast(entities_raw.raw_data); + // std::vector entities(raw_data, raw_data + size * len_per_row); + + auto len_per_row = entities_raw.sizeof_per_row; + std::vector> ordering; + ordering.resize(size); + // #pragma omp parallel for + for (int i = 0; i < size; ++i) { + ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i], i); } + std::sort(ordering.begin(), ordering.end()); + auto sizeof_infos = schema_->get_sizeof_infos(); + std::vector offset_infos(schema_->size() + 1, 0); + std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1); + std::vector> entities(schema_->size()); + + for (int fid = 0; fid < schema_->size(); ++fid) { + auto len = sizeof_infos[fid]; + entities[fid].resize(len * size); + } + + std::vector uids(size); + std::vector timestamps(size); + // #pragma omp parallel for + for (int index = 0; index < size; ++index) { + auto [t, uid, order_index] = ordering[index]; + timestamps[index] = t; + uids[index] = uid; + for (int fid = 0; fid < schema_->size(); ++fid) { + auto len = sizeof_infos[fid]; + auto offset = offset_infos[fid]; + auto src = raw_data + offset + order_index * len_per_row; + auto dst = entities[fid].data() + index * len; + memcpy(dst, src, len); + } + } + + record_.timestamps_.set_data(reserved_begin, timestamps.data(), size); + record_.uids_.set_data(reserved_begin, uids.data(), size); + for (int fid = 0; fid < schema_->size(); ++fid) { + record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size); + } + + record_.ack_responder_.AddSegment(reserved_begin, size); return Status::OK(); + + // std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities)); + // go.detach(); + // const auto& schema = *schema_; + // auto record_ptr = GetMutableRecord(); + // assert(record_ptr); + // auto& record = *record_ptr; + // auto data_chunk = ColumnBasedDataChunk::from(row_values, schema); + // + // // TODO: use shared_lock for better concurrency + // std::lock_guard lck(mutex_); + // assert(state_ == SegmentState::Open); + // auto ack_id = ack_count_.load(); + // record.uids_.grow_by(primary_keys, primary_keys + size); + // for (int64_t i = 0; i < size; ++i) { + // auto key = primary_keys[i]; + // auto internal_index = i + ack_id; + // internal_indexes_[key] = internal_index; + // } + // record.timestamps_.grow_by(timestamps, timestamps + size); + // for (int fid = 0; fid < schema.size(); ++fid) { + // auto field = schema[fid]; + // auto total_len = field.get_sizeof() * size / sizeof(float); + // auto source_vec = data_chunk.entity_vecs[fid]; + // record.entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len); + // } + // + // // finish insert + // ack_count_ += size; + // return Status::OK(); +} + +Status +SegmentNaive::Delete(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) { + throw std::runtime_error("unimplemented"); + // for (int i = 0; i < size; ++i) { + // auto key = primary_keys[i]; + // auto time = timestamps[i]; + // delete_logs_.insert(std::make_pair(key, time)); + // } + // return Status::OK(); } // TODO: remove mock + Status -SegmentNaive::Query(const query::QueryPtr &query, Timestamp timestamp, QueryResult &result) { - std::shared_lock lck(mutex_); - auto ack_count = ack_count_.load(); - assert(query == nullptr); - assert(schema_->size() >= 1); - const auto& field = schema_->operator[](0); - assert(field.get_data_type() == DataType::VECTOR_FLOAT); - assert(field.get_name() == "fakevec"); - auto dim = field.get_dim(); - // assume query vector is [0, 0, ..., 0] - std::vector query_vector(dim, 0); - auto& target_vec = entity_vecs_[0]; - int current_index = -1; - float min_diff = std::numeric_limits::max(); - for(int index = 0; index < ack_count; ++index) { - float diff = 0; - int offset = index * dim; - for(auto d = 0; d < dim; ++d) { - auto v = target_vec[offset + d] - query_vector[d]; - diff += v * v; - } - if(diff < min_diff) { - min_diff = diff; - current_index = index; - } - } - QueryResult query_result; - query_result.row_num_ = 1; - query_result.result_distances_.push_back(min_diff); - query_result.result_ids_.push_back(uids_[current_index]); - // query_result.data_chunk_ = nullptr; - result = std::move(query_result); - return Status::OK(); +SegmentNaive::QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& result) { + throw std::runtime_error("unimplemented"); + // auto ack_count = ack_count_.load(); + // assert(query == nullptr); + // assert(schema_->size() >= 1); + // const auto& field = schema_->operator[](0); + // assert(field.get_data_type() == DataType::VECTOR_FLOAT); + // assert(field.get_name() == "fakevec"); + // auto dim = field.get_dim(); + // // assume query vector is [0, 0, ..., 0] + // std::vector query_vector(dim, 0); + // auto& target_vec = record.entity_vecs_[0]; + // int current_index = -1; + // float min_diff = std::numeric_limits::max(); + // for (int index = 0; index < ack_count; ++index) { + // float diff = 0; + // int offset = index * dim; + // for (auto d = 0; d < dim; ++d) { + // auto v = target_vec[offset + d] - query_vector[d]; + // diff += v * v; + // } + // if (diff < min_diff) { + // min_diff = diff; + // current_index = index; + // } + // } + // QueryResult query_result; + // query_result.row_num_ = 1; + // query_result.result_distances_.push_back(min_diff); + // query_result.result_ids_.push_back(record.uids_[current_index]); + // query_result.data_chunk_ = nullptr; + // result = std::move(query_result); + // return Status::OK(); } -} // namespace milvus::engine - - - - - +Status +SegmentNaive::Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& result) { + // TODO: enable delete + // TODO: enable index + auto& field = schema_->operator[](0); + assert(field.get_name() == "fakevec"); + assert(field.get_data_type() == DataType::VECTOR_FLOAT); + auto dim = field.get_dim(); + assert(query == nullptr); + int64_t barrier = [&] + { + auto& vec = record_.timestamps_; + int64_t beg = 0; + int64_t end = record_.ack_responder_.GetAck(); + while (beg < end) { + auto mid = (beg + end) / 2; + if (vec[mid] < timestamp) { + end = mid + 1; + } else { + beg = mid; + } + } + return beg; + }(); + // search until barriers + // TODO: optimize + auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_[0]); + for(int64_t i = 0; i < barrier; ++i) { +// auto element = + throw std::runtime_error("unimplemented"); + } + return Status::OK(); + // find end of binary + // throw std::runtime_error("unimplemented"); + // auto record_ptr = GetMutableRecord(); + // if (record_ptr) { + // return QueryImpl(*record_ptr, query, timestamp, result); + // } else { + // assert(ready_immutable_); + // return QueryImpl(*record_immutable_, query, timestamp, result); + // } +} +Status +SegmentNaive::Close() { + state_ = SegmentState::Closed; + return Status::OK(); + // auto src_record = GetMutableRecord(); + // assert(src_record); + // + // auto dst_record = std::make_shared(schema_->size()); + // + // auto data_move = [](auto& dst_vec, const auto& src_vec) { + // assert(dst_vec.size() == 0); + // dst_vec.insert(dst_vec.begin(), src_vec.begin(), src_vec.end()); + // }; + // data_move(dst_record->uids_, src_record->uids_); + // data_move(dst_record->timestamps_, src_record->uids_); + // + // assert(src_record->entity_vecs_.size() == schema_->size()); + // assert(dst_record->entity_vecs_.size() == schema_->size()); + // for (int i = 0; i < schema_->size(); ++i) { + // data_move(dst_record->entity_vecs_[i], src_record->entity_vecs_[i]); + // } + // bool ready_old = false; + // record_immutable_ = dst_record; + // ready_immutable_.compare_exchange_strong(ready_old, true); + // if (ready_old) { + // throw std::logic_error("Close may be called twice, with potential race condition"); + // } + // return Status::OK(); +} +Status +SegmentNaive::BuildIndex() { + throw std::runtime_error("unimplemented"); + // assert(ready_immutable_); + // throw std::runtime_error("unimplemented"); +} +} // namespace milvus::dog_segment diff --git a/core/src/dog_segment/SegmentNaive.h b/core/src/dog_segment/SegmentNaive.h new file mode 100644 index 0000000000..e58bfa9e18 --- /dev/null +++ b/core/src/dog_segment/SegmentNaive.h @@ -0,0 +1,147 @@ +#pragma once +#include +#include +#include + +#include + +#include "AckResponder.h" +#include "ConcurrentVector.h" +#include "dog_segment/SegmentBase.h" +// #include "knowhere/index/structured_index/StructuredIndex.h" +#include "query/GeneralQuery.h" +#include "utils/Status.h" +using idx_t = int64_t; + +namespace milvus::dog_segment { +struct ColumnBasedDataChunk { + std::vector> entity_vecs; + static ColumnBasedDataChunk + from(const DogDataChunk& source, const Schema& schema) { + ColumnBasedDataChunk dest; + auto count = source.count; + auto raw_data = reinterpret_cast(source.raw_data); + auto align = source.sizeof_per_row; + for (auto& field : schema) { + auto len = field.get_sizeof(); + assert(len % sizeof(float) == 0); + std::vector new_col(len * count / sizeof(float)); + for (int64_t i = 0; i < count; ++i) { + memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align, len); + } + dest.entity_vecs.push_back(std::move(new_col)); + // offset the raw_data + raw_data += len / sizeof(float); + } + return dest; + } +}; + +class SegmentNaive : public SegmentBase { + public: + virtual ~SegmentNaive() = default; + + // SegmentBase(std::shared_ptr collection); + + int64_t PreInsert(int64_t size) override; + + // TODO: originally, id should be put into data_chunk + // TODO: Is it ok to put them the other side? + Status + Insert(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) override; + + int64_t PreDelete(int64_t size) override; + + // TODO: add id into delete log, possibly bitmap + Status + Delete(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) override; + + // query contains metadata of + Status + Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) override; + + // stop receive insert requests + // will move data to immutable vector or something + Status + Close() override; + + // using IndexType = knowhere::IndexType; + // using IndexMode = knowhere::IndexMode; + // using IndexConfig = knowhere::Config; + // BuildIndex With Paramaters, must with Frozen State + // NOTE: index_params contains serveral policies for several index + // TODO: currently, index has to be set at startup, and can't be modified + // AddIndex and DropIndex will be added later + Status + BuildIndex() override; + + Status + DropRawData(std::string_view field_name) override { + // TODO: NO-OP + return Status::OK(); + } + + Status + LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override { + // TODO: NO-OP + return Status::OK(); + } + + private: + struct MutableRecord { + ConcurrentVector uids_; + tbb::concurrent_vector timestamps_; + std::vector> entity_vecs_; + MutableRecord(int entity_size) : entity_vecs_(entity_size) { + } + }; + + struct Record { + std::atomic reserved = 0; + AckResponder ack_responder_; + ConcurrentVector timestamps_; + ConcurrentVector uids_; + std::vector> entity_vec_; + Record(const Schema& schema); + }; + + Status + QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results); + + public: + ssize_t + get_row_count() const override { + return record_.ack_responder_.GetAck(); + } + SegmentState + get_state() const override { + return state_.load(std::memory_order_relaxed); + } + ssize_t + get_deleted_count() const override { + return 0; + } + + public: + friend std::unique_ptr + CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta); + explicit SegmentNaive(SchemaPtr schema, IndexMetaPtr index_meta) + : schema_(schema), index_meta_(index_meta), record_(*schema) { + } + + private: + SchemaPtr schema_; + IndexMetaPtr index_meta_; + std::atomic state_ = SegmentState::Open; + Record record_; + + // tbb::concurrent_unordered_map internal_indexes_; + // std::shared_ptr record_mutable_; + // // to determined that if immutable data if available + // std::shared_ptr record_immutable_ = nullptr; + // std::unordered_map vec_indexings_; + // // TODO: scalar indexing + // // std::unordered_map scalar_indexings_; + // tbb::concurrent_unordered_multimap delete_logs_; +}; +} // namespace milvus::dog_segment diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index 1b43321d11..3109113407 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -30,13 +30,15 @@ DeleteSegment(CSegmentBase segment) { int Insert(CSegmentBase c_segment, - long int reserved_offset, - signed long int size, - const long* primary_keys, - const unsigned long* timestamps, - void* raw_data, - int sizeof_per_row, - signed long int count) { + long int reserved_offset, + signed long int size, + const long* primary_keys, + const unsigned long* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count, + unsigned long timestamp_min, + unsigned long timestamp_max) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; milvus::dog_segment::DogDataChunk dataChunk{}; @@ -61,10 +63,12 @@ PreInsert(CSegmentBase c_segment, long int size) { int Delete(CSegmentBase c_segment, - long int reserved_offset, - long size, - const long* primary_keys, - const unsigned long* timestamps) { + long int reserved_offset, + long size, + const long* primary_keys, + const unsigned long* timestamps, + unsigned long timestamp_min, + unsigned long timestamp_max) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; auto res = segment->Delete(reserved_offset, size, primary_keys, timestamps); diff --git a/core/src/dog_segment/segment_c.h b/core/src/dog_segment/segment_c.h index 937ec69578..40645f0764 100644 --- a/core/src/dog_segment/segment_c.h +++ b/core/src/dog_segment/segment_c.h @@ -17,23 +17,27 @@ DeleteSegment(CSegmentBase segment); int Insert(CSegmentBase c_segment, - long int reserved_offset, - signed long int size, - const long* primary_keys, - const unsigned long* timestamps, - void* raw_data, - int sizeof_per_row, - signed long int count); + long int reserved_offset, + signed long int size, + const long* primary_keys, + const unsigned long* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count, + unsigned long timestamp_min, + unsigned long timestamp_max); long int PreInsert(CSegmentBase c_segment, long int size); int Delete(CSegmentBase c_segment, - long int reserved_offset, - long size, - const long* primary_keys, - const unsigned long* timestamps); + long int reserved_offset, + long size, + const long* primary_keys, + const unsigned long* timestamps, + unsigned long timestamp_min, + unsigned long timestamp_max); long int PreDelete(CSegmentBase c_segment, long int size); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 79b707dc65..430bb15277 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -1,8 +1,9 @@ enable_testing() find_package(GTest REQUIRED) set(MILVUS_TEST_FILES - test_dog_segment.cpp - test_c_api.cpp + test_naive.cpp + # test_dog_segment.cpp + # test_c_api.cpp ) add_executable(all_tests ${MILVUS_TEST_FILES} diff --git a/core/unittest/test_naive.cpp b/core/unittest/test_naive.cpp new file mode 100644 index 0000000000..1c2a1ea3c9 --- /dev/null +++ b/core/unittest/test_naive.cpp @@ -0,0 +1,7 @@ + + +#include + +TEST(TestNaive, Naive) { + EXPECT_TRUE(true); +} \ No newline at end of file diff --git a/reader/query_node.go b/reader/query_node.go index c969ff5f1b..3722067d47 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -236,7 +236,6 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - var numOfRecords = len(node.insertData.insertRecords[segmentID]) var offset = targetSegment.SegmentPreInsert(numOfRecords) node.insertData.insertOffset[segmentID] = offset @@ -255,7 +254,6 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - var numOfRecords = len(node.deleteData.deleteIDs[segmentID]) var offset = targetSegment.SegmentPreDelete(numOfRecords) node.deleteData.deleteOffset[segmentID] = offset @@ -289,12 +287,9 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - ids := node.insertData.insertIDs[segmentID] timestamps := node.insertData.insertTimestamps[segmentID] - offsets := node.insertData.insertOffset[segmentID] - - err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, records) + err = targetSegment.SegmentInsert(&ids, ×tamps, records) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} @@ -310,10 +305,7 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - - offset := node.deleteData.deleteOffset[segmentID] - - err = segment.SegmentDelete(offset, deleteIDs, deleteTimestamps) + err = segment.SegmentDelete(deleteIDs, deleteTimestamps) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} diff --git a/reader/segment.go b/reader/segment.go index 683268b9a0..aa668bb7a7 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -27,8 +27,8 @@ const ( ) type Segment struct { - SegmentPtr C.CSegmentBase - SegmentId int64 + SegmentPtr C.CSegmentBase + SegmentId int64 SegmentCloseTime uint64 } @@ -77,58 +77,54 @@ func (s *Segment) Close() error { //////////////////////////////////////////////////////////////////////////// func (s *Segment) SegmentPreInsert(numOfRecords int) int64 { - /*C.PreInsert - long int - PreInsert(CSegmentBase c_segment, long int size); - */ - var offset = C.PreInsert(C.long(int64(numOfRecords))) + var offset = C.PreInsert(numOfRecords) return offset } func (s *Segment) SegmentPreDelete(numOfRecords int) int64 { - /*C.PreDelete - long int - PreDelete(CSegmentBase c_segment, long int size); - */ - var offset = C.PreDelete(C.long(int64(numOfRecords))) + var offset = C.PreDelete(numOfRecords) return offset } -func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error { +func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte) error { /*C.Insert int Insert(CSegmentBase c_segment, - long int reserved_offset, signed long int size, - const long* primary_keys, + const unsigned long* primary_keys, const unsigned long* timestamps, void* raw_data, int sizeof_per_row, - signed long int count); + signed long int count, + unsigned long timestamp_min, + unsigned long timestamp_max); */ // Blobs to one big blob - var rawData []byte + var rowData []byte for i := 0; i < len(*records); i++ { - copy(rawData, (*records)[i]) + copy(rowData, (*records)[i]) } - var cOffset = C.long(offset) - var cNumOfRows = C.long(len(*entityIDs)) - var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0]) - var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) - var cSizeofPerRow = C.int(len((*records)[0])) - var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) + // TODO: remove hard code schema + // auto schema_tmp = std::make_shared(); + // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); + // schema_tmp->AddField("age", DataType::INT32); + // TODO: remove hard code & fake dataChunk + const DIM = 4 + const N = 3 + var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} + var rawData []int8 + for i := 0; i <= N; i++ { + for _, ele := range vec { + rawData=append(rawData, int8(ele)) + } + rawData=append(rawData, int8(i)) + } + const sizeofPerRow = 4 + DIM * 4 - var status = C.Insert(s.SegmentPtr, - cOffset, - cNumOfRows, - cEntityIdsPtr, - cTimestampsPtr, - cRawDataVoidPtr, - cSizeofPerRow, - cNumOfRows) + var status = C.Insert(s.SegmentPtr, C.long(N), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N), C.ulong(timestampMin), C.ulong(timestampMax)) if status != 0 { return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) @@ -137,21 +133,19 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] return nil } -func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error { +func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error { /*C.Delete int Delete(CSegmentBase c_segment, - long int reserved_offset, long size, - const long* primary_keys, - const unsigned long* timestamps); + const unsigned long* primary_keys, + const unsigned long* timestamps, + unsigned long timestamp_min, + unsigned long timestamp_max); */ - var cOffset = C.long(offset) - var cSize = C.long(len(*entityIDs)) - var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0]) - var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) + size := len(*entityIds) - var status = C.Delete(s.SegmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr) + var status = C.Delete(s.SegmentPtr, C.long(size), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), C.ulong(timestampMin), C.ulong(timestampMax)) if status != 0 { return errors.New("Delete failed, error code = " + strconv.Itoa(int(status))) @@ -175,13 +169,7 @@ func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorReco resultIds := make([]int64, TopK) resultDistances := make([]float32, TopK) - var cQueryPtr = unsafe.Pointer(nil) - var cTimestamp = C.ulong(timestamp) - var cResultIds = (*C.long)(&resultIds[0]) - var cResultDistances = (*C.float)(&resultDistances[0]) - - var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances) - + var status = C.Search(s.SegmentPtr, unsafe.Pointer(nil), C.ulong(timestamp), (*C.long)(&resultIds[0]), (*C.float)(&resultDistances[0])) if status != 0 { return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) } diff --git a/reader/segment_test.go b/reader/segment_test.go index 79a488f049..39303bca6d 100644 --- a/reader/segment_test.go +++ b/reader/segment_test.go @@ -7,270 +7,135 @@ import ( ) func TestConstructorAndDestructor(t *testing.T) { - // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegment_SegmentInsert(t *testing.T) { - // 1. Construct node, collection, and segment +func TestSegmentInsert(t *testing.T) { node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 4 - const N = 3 - var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} - var rawData []byte - for _, ele := range vec { - rawData=append(rawData, byte(ele)) - } - rawData=append(rawData, byte(1)) - var records [][]byte - for i:= 0; i < N; i++ { - records = append(records, rawData) - } - - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.Greater(t, offset, 0) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + var err = segment.SegmentInsert(&ids, ×tamps, nil) assert.NoError(t, err) - // 6. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegment_SegmentDelete(t *testing.T) { - // 1. Construct node, collection, and segment +func TestSegmentDelete(t *testing.T) { node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - // 3. Do PreDelete - var offset = segment.SegmentPreDelete(10) - assert.Greater(t, offset, 0) - - // 4. Do Delete - var err = segment.SegmentDelete(offset, &ids, ×tamps) + var err = segment.SegmentDelete(&ids, ×tamps) assert.NoError(t, err) - // 5. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegment_SegmentSearch(t *testing.T) { - // 1. Construct node, collection, and segment +func TestSegmentSearch(t *testing.T) { node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 4 - const N = 3 - var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} - var rawData []byte - for _, ele := range vec { - rawData=append(rawData, byte(ele)) - } - rawData=append(rawData, byte(1)) - var records [][]byte - for i:= 0; i < N; i++ { - records = append(records, rawData) - } + var insertErr = segment.SegmentInsert(&ids, ×tamps, nil) + assert.NoError(t, insertErr) - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.Greater(t, offset, 0) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) - - // 6. Do search var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil) assert.NoError(t, searchErr) fmt.Println(searchRes) - // 7. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegment_SegmentPreInsert(t *testing.T) { - // 1. Construct node, collection, and segment - node := NewQueryNode(0, 0) - var collection = node.NewCollection("collection0", "fake schema") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - - // 2. Do PreInsert - var offset = segment.SegmentPreInsert(10) - assert.Greater(t, offset, 0) - - // 3. Destruct node, collection, and segment - partition.DeleteSegment(segment) - collection.DeletePartition(partition) - node.DeleteCollection(collection) -} - -func TestSegment_SegmentPreDelete(t *testing.T) { - // 1. Construct node, collection, and segment - node := NewQueryNode(0, 0) - var collection = node.NewCollection("collection0", "fake schema") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - - // 2. Do PreDelete - var offset = segment.SegmentPreDelete(10) - assert.Greater(t, offset, 0) - - // 3. Destruct node, collection, and segment - partition.DeleteSegment(segment) - collection.DeletePartition(partition) - node.DeleteCollection(collection) -} - -// Segment util functions test -//////////////////////////////////////////////////////////////////////////// func TestSegment_GetStatus(t *testing.T) { - // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Get segment status var status = segment.GetStatus() assert.Equal(t, status, SegmentOpened) - // 3. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_Close(t *testing.T) { - // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Close segment var err = segment.Close() assert.NoError(t, err) - // 3. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_GetRowCount(t *testing.T) { - // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 4 - const N = 3 - var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} - var rawData []byte - for _, ele := range vec { - rawData=append(rawData, byte(ele)) - } - rawData=append(rawData, byte(1)) - var records [][]byte - for i:= 0; i < N; i++ { - records = append(records, rawData) - } - - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.Greater(t, offset, 0) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + var err = segment.SegmentInsert(&ids, ×tamps, nil) assert.NoError(t, err) - // 6. Get segment row count var rowCount = segment.GetRowCount() assert.Equal(t, rowCount, int64(len(ids))) - // 7. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_GetDeletedCount(t *testing.T) { - // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) - // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - // 3. Do PreDelete - var offset = segment.SegmentPreDelete(10) - assert.Greater(t, offset, 0) - - // 4. Do Delete - var err = segment.SegmentDelete(offset, &ids, ×tamps) + var err = segment.SegmentDelete(&ids, ×tamps) assert.NoError(t, err) - // 5. Get segment deleted count var deletedCount = segment.GetDeletedCount() // TODO: assert.Equal(t, deletedCount, len(ids)) assert.Equal(t, deletedCount, int64(0)) - // 6. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection)