From 0d2780a2eb89888dd4e3a4509484957b167dd037 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Sun, 27 Sep 2020 18:26:33 +0800 Subject: [PATCH] Add reader writeLog, fix insert buffer out of range Signed-off-by: bigsheeper --- reader/read_node/query_node.go | 79 ++++++++++++---------- reader/read_node/segment.go | 7 +- reader/read_node/util_functions.go | 56 ++++++++++++++++ sdk/examples/simple/create_collection.cpp | 24 +++++-- sdk/examples/simple/insert.cpp | 81 +++++++++++++++++++---- sdk/examples/simple/search.cpp | 10 ++- sdk/utils/Utils.cpp | 7 +- sdk/utils/Utils.h | 4 +- 8 files changed, 205 insertions(+), 63 deletions(-) diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index d766d0cd38..da41ad6052 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -17,6 +17,9 @@ import ( "encoding/json" "fmt" "github.com/czs007/suvlim/conf" + msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/reader/message_client" "github.com/stretchr/testify/assert" "log" "sort" @@ -24,9 +27,6 @@ import ( "sync/atomic" "time" - msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/reader/message_client" //"github.com/stretchr/testify/assert" ) @@ -69,8 +69,21 @@ type QueryInfo struct { type MsgCounter struct { InsertCounter int64 + InsertTime time.Time + DeleteCounter int64 + DeleteTime time.Time + SearchCounter int64 + SearchTime time.Time +} + +type InsertLog struct { + MsgLength int + DurationInMilliseconds int64 + InsertTime time.Time + NumSince int64 + Speed float64 } type QueryNode struct { @@ -86,6 +99,7 @@ type QueryNode struct { insertData InsertData kvBase *kv.EtcdKVBase msgCounter *MsgCounter + InsertLogs []InsertLog } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { @@ -95,7 +109,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - ServiceTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -135,7 +149,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - ServiceTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -162,6 +176,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes queryNodeTimeSync: queryNodeTimeSync, buffer: buffer, msgCounter: &msgCounter, + InsertLogs: make([]InsertLog, 0), } } @@ -246,13 +261,11 @@ func (node *QueryNode) InitQueryNodeCollection() { func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { const Debug = true - const CountMsgNum = 1000 * 1000 + const CountInsertMsgBaseline = 1000 * 1000 + var BaselineCounter int64 = 0 + node.msgCounter.InsertTime = time.Now() if Debug { - var printFlag = true - var startTime = true - var start time.Time - for { var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} @@ -264,10 +277,9 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { continue } - if startTime { - fmt.Println("============> Start Test <============") - startTime = false - start = time.Now() + if node.msgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter { + node.WriteQueryLog() + BaselineCounter++ } node.QueryNodeDataInit() @@ -279,13 +291,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { node.DoInsertAndDelete() //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - - // Test insert time - if printFlag && node.msgCounter.InsertCounter >= CountMsgNum { - printFlag = false - timeSince := time.Since(start) - fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============") - } } } @@ -334,14 +339,14 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") //for { - //if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { - var status = node.Search(node.messageClient.SearchMsg) - if status.ErrorCode != 0 { - fmt.Println("Search Failed") - node.PublishFailedSearchResult() - } - //break - //} + //if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { + var status = node.Search(node.messageClient.SearchMsg) + if status.ErrorCode != 0 { + fmt.Println("Search Failed") + node.PublishFailedSearchResult() + } + //break + //} //} default: } @@ -485,9 +490,9 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { func (node *QueryNode) DoInsertAndDelete() msgPb.Status { var wg sync.WaitGroup // Do insert - for segmentID, records := range node.insertData.insertRecords { + for segmentID := range node.insertData.insertRecords { wg.Add(1) - go node.DoInsert(segmentID, &records, &wg) + go node.DoInsert(segmentID, &wg) } // Do delete @@ -505,7 +510,7 @@ func (node *QueryNode) DoInsertAndDelete() msgPb.Status { return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} } -func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status { +func (node *QueryNode) DoInsert(segmentID int64, wg *sync.WaitGroup) msgPb.Status { fmt.Println("Doing insert..., len = ", len(node.insertData.insertIDs[segmentID])) var targetSegment, err = node.GetSegmentBySegmentID(segmentID) if err != nil { @@ -515,10 +520,12 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai ids := node.insertData.insertIDs[segmentID] timestamps := node.insertData.insertTimestamps[segmentID] + records := node.insertData.insertRecords[segmentID] offsets := node.insertData.insertOffset[segmentID] - node.msgCounter.InsertCounter += int64(len(ids)) - err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, records) + node.QueryLog(len(ids)) + + err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, &records) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} @@ -585,7 +592,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds. // Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`. var logicTimestamp = searchTimestamp << 46 >> 46 - searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval + 600)) << 18 + logicTimestamp + searchTimestamp = (searchTimestamp>>18-uint64(conf.Config.Timesync.Interval+600))<<18 + logicTimestamp var vector = msg.Records // We now only the first Json is valid. @@ -594,7 +601,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 1. Timestamp check // TODO: return or wait? Or adding graceful time if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync { - fmt.Println("Invalid query time, timestamp = ", searchTimestamp >> 18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync >> 18) + fmt.Println("Invalid query time, timestamp = ", searchTimestamp>>18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync>>18) return msgPb.Status{ErrorCode: 1} } diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index 3da047a4e8..c369a13247 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/czs007/suvlim/errors" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/stretchr/testify/assert" "strconv" "unsafe" ) @@ -143,11 +144,13 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] var numOfRow = len(*entityIDs) var sizeofPerRow = len((*records)[0]) - var rawData = make([]byte, numOfRow*sizeofPerRow) + assert.Equal(nil, numOfRow, len(*records)) + + var rawData = make([]byte, numOfRow * sizeofPerRow) var copyOffset = 0 for i := 0; i < len(*records); i++ { copy(rawData[copyOffset:], (*records)[i]) - copyOffset += len((*records)[i]) + copyOffset += sizeofPerRow } var cOffset = C.long(offset) diff --git a/reader/read_node/util_functions.go b/reader/read_node/util_functions.go index c9071d4e04..303d824d90 100644 --- a/reader/read_node/util_functions.go +++ b/reader/read_node/util_functions.go @@ -1,8 +1,13 @@ package reader import ( + "encoding/json" "errors" + "fmt" + log "github.com/apache/pulsar/pulsar-client-go/logutil" + "os" "strconv" + "time" ) // Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs @@ -68,3 +73,54 @@ func (c *Collection) GetPartitionByName(partitionName string) (partition *Partit return nil // TODO: remove from c.Partitions } + +func (node *QueryNode) QueryLog(length int) { + node.msgCounter.InsertCounter += int64(length) + timeNow := time.Now() + duration := timeNow.Sub(node.msgCounter.InsertTime) + speed := float64(length) / duration.Seconds() + + insertLog := InsertLog{ + MsgLength: length, + DurationInMilliseconds: duration.Milliseconds(), + InsertTime: timeNow, + NumSince: node.msgCounter.InsertCounter, + Speed: speed, + } + + node.InsertLogs = append(node.InsertLogs, insertLog) + node.msgCounter.InsertTime = timeNow +} + +func (node *QueryNode) WriteQueryLog() { + f, err := os.OpenFile("/tmp/query_node.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } + + // write logs + for _, insertLog := range node.InsertLogs { + insertLogJson, err := json.Marshal(&insertLog) + if err != nil { + log.Fatal(err) + } + + writeString := string(insertLogJson) + "\n" + fmt.Println(writeString) + + _, err2 := f.WriteString(writeString) + if err2 != nil { + log.Fatal(err2) + } + } + + // reset InsertLogs buffer + node.InsertLogs = make([]InsertLog, 0) + + err = f.Close() + if err != nil { + log.Fatal(err) + } + + fmt.Println("write log done") +} diff --git a/sdk/examples/simple/create_collection.cpp b/sdk/examples/simple/create_collection.cpp index 65b1fff84e..a020938cf6 100644 --- a/sdk/examples/simple/create_collection.cpp +++ b/sdk/examples/simple/create_collection.cpp @@ -10,17 +10,24 @@ const int DIM = 128; int main(int argc , char**argv) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); + if (!parameters.is_valid){ return 0; } + + if (parameters.collection_name_.empty()){ + std::cout<< "should specify collection name!" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + auto client = milvus::ConnectionImpl(); milvus::ConnectParam connect_param; connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_; connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ; client.Connect(connect_param); - milvus::Status stat; - const std::string collectin_name = "collectionTest"; + const std::string collection_name = parameters.collection_name_; // Create milvus::FieldPtr field_ptr1 = std::make_shared(); @@ -34,15 +41,24 @@ int main(int argc , char**argv) { field_ptr2->field_type = milvus::DataType::VECTOR_FLOAT; field_ptr2->dim = DIM; - milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2}}; + milvus::Mapping mapping = {collection_name, {field_ptr1, field_ptr2}}; + milvus::Status stat; stat = client.CreateCollection(mapping, "extra_params"); + if (!stat.ok()){ + std::cout << "create collection failed!" << std::endl; + return 0; + } + + std::cout << "create collection done!" << std::endl; // Get Collection info milvus::Mapping map; - client.GetCollectionInfo(collectin_name, map); + client.GetCollectionInfo(collection_name, map); for (auto &f : map.fields) { std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl; } + return 0; + } diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 6d31d18ce8..b539adb3da 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -23,20 +23,31 @@ const int N = 200000; const int DIM = 128; +const int LOOP = 100; +int ID_START = 0; -const milvus::FieldValue GetData() { +int generate_ids(std::vector & ids_array, int count); + +int generate_ids(std::vector& ids_array, int count) { + for (int i = 0; i < count; i++) { + ids_array.push_back(ID_START++); + } + return 0; +} + +const milvus::FieldValue GetData(int count) { milvus::FieldValue value_map; std::vector int32_data; - for (int i = 0; i < N; i++) { - int32_data.push_back(i); + for (int i = 0; i < count; i++) { + int32_data.push_back(ID_START++); } std::default_random_engine eng(42); std::normal_distribution dis(0, 1); std::vector vector_data; - for (int i = 0; i < N; i++) { + for (int i = 0; i < count; i++) { std::vector float_data(DIM); for(auto &x: float_data) { x = dis(eng); @@ -52,6 +63,33 @@ const milvus::FieldValue GetData() { return value_map; } +bool checkSchema(){ + // Get Collection info + bool ret = false; + + milvus::FieldPtr field_ptr1 = std::make_shared(); + milvus::FieldPtr field_ptr2 = std::make_shared(); + + field_ptr1->field_name = "age"; + field_ptr1->field_type = milvus::DataType::INT32; + field_ptr1->dim = 1; + + field_ptr2->field_name = "field_vec"; + field_ptr2->field_type = milvus::DataType::VECTOR_FLOAT; + field_ptr2->dim = DIM; + + std::vector fields{field_ptr1, field_ptr2}; + + milvus::Mapping map; + //client.GetCollectionInfo(collection_name, map); + + for (auto &f : map.fields) { + ///std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl; + } + + return true; +} + int main(int argc, char* argv[]) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); @@ -59,21 +97,38 @@ main(int argc, char* argv[]) { return 0; } + if (parameters.collection_name_.empty()){ + std::cout<< "should specify collection name!" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + const std::string collection_name = parameters.collection_name_; auto client = milvus::ConnectionImpl(); milvus::ConnectParam connect_param; connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_; connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ; client.Connect(connect_param); - std::vector ids_array; - auto data = GetData(); - for (int64_t i = 0; i < N; i++) { - ids_array.push_back(i); - } - milvus_sdk::TimeRecorder insert("insert"); - auto status = client.Insert("collection0", "tag01", data, ids_array); - if (!status.ok()){ - return -1; + + int per_count = N / LOOP; + int failed_count = 0; + + milvus_sdk::TimeRecorder insert_timer("insert"); + for (int64_t i = 0; i < LOOP; i++) { + std::vector ids_array; + generate_ids(ids_array, per_count); + auto data = GetData(per_count); + insert_timer.Start(); + auto status = client.Insert(collection_name, "default", data, ids_array); + if (!status.ok()){ + failed_count += 1; + } + insert_timer.End(); } + if (failed_count > 0) { + std::cout <<" test done, failed_count is :" << failed_count<< std::endl; + } + insert_timer.Print(LOOP); return 0; } diff --git a/sdk/examples/simple/search.cpp b/sdk/examples/simple/search.cpp index b220e0ec4d..b3173ab4ab 100644 --- a/sdk/examples/simple/search.cpp +++ b/sdk/examples/simple/search.cpp @@ -21,6 +21,7 @@ const int TOP_K = 10; const int LOOP = 1000; +const int DIM = 128; int main(int argc , char**argv) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); @@ -28,6 +29,13 @@ int main(int argc , char**argv) { return 0; } + if (parameters.collection_name_.empty()){ + std::cout<< "should specify collection name!" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + auto client = milvus::ConnectionImpl(); milvus::ConnectParam connect_param; connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_; @@ -36,8 +44,6 @@ int main(int argc , char**argv) { std::vector ids_array; std::vector partition_list; partition_list.emplace_back("default"); -// partition_list.emplace_back("partition-2"); -// partition_list.emplace_back("partition-3"); milvus::VectorParam vectorParam; std::vector vector_records; diff --git a/sdk/utils/Utils.cpp b/sdk/utils/Utils.cpp index f3954eb56a..88e50da3ae 100644 --- a/sdk/utils/Utils.cpp +++ b/sdk/utils/Utils.cpp @@ -472,10 +472,9 @@ Utils::PrintTopKQueryResult(milvus::TopKQueryResult& topk_query_result) { void -Utils::HAHE(int argc){ - - std::cout<<"FUCK"<