From 22a44c995debe1259713cfb8b373bbfc84ebe0e5 Mon Sep 17 00:00:00 2001 From: become-nice <995581097@qq.com> Date: Wed, 26 Aug 2020 18:29:42 +0800 Subject: [PATCH] Add serialzation function Signed-off-by: become-nice <995581097@qq.com> --- core/CMakeLists.txt | 3 +- core/cmake/FindGTest.cmake | 43 --------------------- core/src/dog_segment/CMakeLists.txt | 2 +- core/unittest/CMakeLists.txt | 15 -------- core/unittest/test_dog_segment.cpp | 13 +++---- pulsar/schema/message.go | 14 +++++-- writer/writer.go | 58 +++++++++++++++++++---------- 7 files changed, 57 insertions(+), 91 deletions(-) delete mode 100644 core/cmake/FindGTest.cmake diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index dbed0fb41a..79ae64f03b 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -2,7 +2,8 @@ project(sulvim_core) cmake_minimum_required(VERSION 3.16) set( CMAKE_CXX_STANDARD 17 ) set( CMAKE_CXX_STANDARD_REQUIRED on ) -set (CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${CMAKE_CURRENT_SOURCE_DIR}/cmake") + + include_directories(src) add_subdirectory(src) add_subdirectory(unittest) diff --git a/core/cmake/FindGTest.cmake b/core/cmake/FindGTest.cmake deleted file mode 100644 index 53640adc6c..0000000000 --- a/core/cmake/FindGTest.cmake +++ /dev/null @@ -1,43 +0,0 @@ -########################### GTEST -# Enable ExternalProject CMake module -INCLUDE(ExternalProject) - -# Set default ExternalProject root directory -SET_DIRECTORY_PROPERTIES(PROPERTIES EP_PREFIX ${CMAKE_BINARY_DIR}/third_party) - -# Add gtest -# http://stackoverflow.com/questions/9689183/cmake-googletest -ExternalProject_Add( - googletest - URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip - # TIMEOUT 10 - # # Force separate output paths for debug and release builds to allow easy - # # identification of correct lib in subsequent TARGET_LINK_LIBRARIES commands - # CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs - # -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_RELEASE:PATH=ReleaseLibs - # -Dgtest_force_shared_crt=ON - # Disable install step - INSTALL_COMMAND "" - # Wrap download, configure and build steps in a script to log output - LOG_DOWNLOAD ON - LOG_CONFIGURE ON - LOG_BUILD ON) - -# Specify include dir -ExternalProject_Get_Property(googletest source_dir) -set(GTEST_INCLUDE_DIR ${source_dir}/include) - -# Library -ExternalProject_Get_Property(googletest binary_dir) - -# set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a) -# set(GTEST_LIBRARY gtest) -# add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED) -# set_property(TARGET ${GTEST_LIBRARY} PROPERTY IMPORTED_LOCATION -# ${GTEST_LIBRARY_PATH} ) -# add_dependencies(${GTEST_LIBRARY} googletest) -set(GTEST_LIBRARY_PATH ${binary_dir}/lib) -add_library(gtest UNKNOWN IMPORTED) -add_library(gtest_main UNKNOWN IMPORTED) -set_property(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest.a) -set_property(TARGET gtest_main PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest_main.a) diff --git a/core/src/dog_segment/CMakeLists.txt b/core/src/dog_segment/CMakeLists.txt index e258c28ba9..4f0744641e 100644 --- a/core/src/dog_segment/CMakeLists.txt +++ b/core/src/dog_segment/CMakeLists.txt @@ -7,4 +7,4 @@ add_library(milvus_dog_segment ${DOG_SEGMENT_FILES} ) #add_dependencies( segment sqlite mysqlpp ) -target_link_libraries(milvus_dog_segment tbb milvus_utils pthread) \ No newline at end of file +target_link_libraries(milvus_dog_segment tbb milvus_utils) \ No newline at end of file diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 5c07c0d9b4..e69de29bb2 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -1,15 +0,0 @@ -enable_testing() -find_package(GTest REQUIRED) -set(MILVUS_TEST_FILES - test_dog_segment.cpp -) -add_executable(all_tests - ${MILVUS_TEST_FILES} -) - -target_link_libraries(all_tests - gtest - gtest_main - milvus_dog_segment - pthread -) \ No newline at end of file diff --git a/core/unittest/test_dog_segment.cpp b/core/unittest/test_dog_segment.cpp index e8e9a957b5..7a89a9f461 100644 --- a/core/unittest/test_dog_segment.cpp +++ b/core/unittest/test_dog_segment.cpp @@ -26,16 +26,14 @@ // #include "segment/SegmentWriter.h" // #include "src/dog_segment/SegmentBase.h" // #include "utils/Json.h" -#include -#include #include "dog_segment/SegmentBase.h" using std::cin; using std::cout; using std::endl; -// using SegmentVisitor = milvus::engine::SegmentVisitor; +using SegmentVisitor = milvus::engine::SegmentVisitor; -// namespace { +namespace { // milvus::Status // CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { // CreateCollectionContext context; @@ -75,7 +73,7 @@ using std::endl; // } // } // namespace -TEST(DogSegmentTest, TestABI) { +TEST_F(DogSegmentTest, TestABI) { using namespace milvus::engine; using namespace milvus::dog_segment; ASSERT_EQ(TestABI(), 42); @@ -137,7 +135,7 @@ TEST(DogSegmentTest, TestABI) { -TEST(DogSegmentTest, MockTest) { +TEST_F(DogSegmentTest, MockTest) { using namespace milvus::dog_segment; using namespace milvus::engine; auto schema = std::make_shared(); @@ -163,12 +161,11 @@ TEST(DogSegmentTest, MockTest) { auto line_sizeof = (sizeof(int) + sizeof(float) * 16); assert(raw_data.size() == line_sizeof * N); - auto segment = CreateSegment(schema).release(); + auto segment = CreateSegment(schema); DogDataChunk data_chunk{raw_data.data(), (int)line_sizeof, N}; segment->Insert(N, uids.data(), timestamps.data(), data_chunk); QueryResult query_result; segment->Query(nullptr, 0, query_result); - delete segment; int i = 0; i++; } diff --git a/pulsar/schema/message.go b/pulsar/schema/message.go index c7ec414b8f..5c4ad56b29 100644 --- a/pulsar/schema/message.go +++ b/pulsar/schema/message.go @@ -1,6 +1,9 @@ package schema -import "bytes" +import ( + "encoding/json" + "fmt" +) type ErrorCode int32 @@ -110,7 +113,7 @@ type InsertMsg struct { type DeleteMsg struct { CollectionName string EntityId int64 - Timestamp int64 + Timestamp uint64 ClientId int64 MsgType OpType } @@ -141,8 +144,11 @@ func (ims *InsertMsg) GetType() OpType { } func (ims *InsertMsg) Serialization() []byte { - var serialization_data bytes.Buffer - return serialization_data.Bytes() + data, err := json.Marshal(ims) + if err != nil { + fmt.Println("Can't serialization") + } + return data } func (ims *InsertMsg) Deserialization(serializationData []byte) { diff --git a/writer/writer.go b/writer/writer.go index 55c847f221..f109585857 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -24,10 +24,10 @@ type writeNode struct { func NewWriteNode(ctx context.Context, openSegmentId string, - timeSync uint64, closeTime uint64, nextSegmentId string, - nextCloseSegmentTime uint64) (*writeNode, error) { + nextCloseSegmentTime uint64, + timeSync uint64) (*writeNode, error) { ctx = context.Background() store, err := storage.NewStore(ctx, "TIKV") writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync} @@ -44,7 +44,7 @@ func NewWriteNode(ctx context.Context, }, nil } -func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, time_sync uint64) error { +func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, timeSync uint64) error { var i int var storeKey string @@ -52,37 +52,57 @@ func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg var binaryData [][]byte var timeStamps []uint64 - for i = 0; i < cap(data); i++ { + for i = 0; i < len(data); i++ { storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) keys = append(keys, []byte(storeKey)) binaryData = append(binaryData, data[i].Serialization()) timeStamps = append(timeStamps, data[i].Timestamp) } - if s.segmentCloseTime <= time_sync { + if s.segmentCloseTime <= timeSync { s.openSegmentId = s.nextSegmentId s.segmentCloseTime = s.nextSegmentCloseTime } - (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) - s.UpdateInsertTimeSync(time_sync) + err := (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) + s.UpdateInsertTimeSync(timeSync) + return err +} + +func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, timeSync uint64) error { + var i int + var storeKey string + + var keys [][]byte + var timeStamps []uint64 + + for i = 0; i < len(data); i++ { + storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) + keys = append(keys, []byte(storeKey)) + timeStamps = append(timeStamps, data[i].Timestamp) + } + + //TODO:Get segment id for delete data and deliver those message to specify topic + + err := (*s.kvStore).DeleteRows(ctx, keys, timeStamps) + s.UpdateDeleteTimeSync(timeSync) + return err +} + +func (s *writeNode) AddNewSegment(segmentId string, closeSegmentTime uint64) error { + s.nextSegmentId = segmentId + s.nextSegmentCloseTime = closeSegmentTime return nil } -func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, time_sync uint64) error { - return nil +func (s *writeNode) UpdateInsertTimeSync(timeSync uint64) { + s.timeSyncTable.insertTimeSync = timeSync } -func (s *writeNode) AddNewSegment(segment_id string, close_segment_time uint64) error { - s.nextSegmentId = segment_id - s.nextSegmentCloseTime = close_segment_time - return nil +func (s *writeNode) UpdateDeleteTimeSync(timeSync uint64) { + s.timeSyncTable.deleteTimeSync = timeSync } -func (s *writeNode) UpdateInsertTimeSync(time_sync uint64) { - s.timeSyncTable.insertTimeSync = time_sync -} - -func (s *writeNode) UpdateDeleteTimeSync(time_sync uint64) { - s.timeSyncTable.deleteTimeSync = time_sync +func (s *writeNode) UpdateCloseTime(closeTime uint64) { + s.segmentCloseTime = closeTime }