From dc916ec10c88331fdc63a8061adf314610016d1c Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 14 Sep 2020 14:11:36 +0800 Subject: [PATCH] Add execution time test Signed-off-by: bigsheeper --- core/src/dog_segment/segment_c.cpp | 6 ++-- reader/message_client/message_client.go | 12 ++++++++ reader/query_node.go | 37 +++++++++++++++++++------ 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index 81fbb80d29..21fae04fa6 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -48,7 +48,7 @@ Insert(CSegmentBase c_segment, auto res = segment->Insert(reserved_offset, size, primary_keys, timestamps, dataChunk); // TODO: delete print - std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl; + // std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl; return res.code(); } @@ -58,7 +58,7 @@ PreInsert(CSegmentBase c_segment, long int size) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; // TODO: delete print - std::cout << "PreInsert segment " << std::endl; + // std::cout << "PreInsert segment " << std::endl; return segment->PreInsert(size); } @@ -81,7 +81,7 @@ PreDelete(CSegmentBase c_segment, long int size) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; // TODO: delete print - std::cout << "PreDelete segment " << std::endl; + // std::cout << "PreDelete segment " << std::endl; return segment->PreDelete(size); } diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index afe5e0b16c..eeeb3b825a 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -2,10 +2,12 @@ package message_client import ( "context" + "fmt" "github.com/apache/pulsar-client-go/pulsar" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" "github.com/golang/protobuf/proto" "log" + "time" ) type MessageClient struct { @@ -45,6 +47,8 @@ func (mc *MessageClient) GetSearchChan() chan *msgpb.SearchMsg { } func (mc *MessageClient) ReceiveInsertOrDeleteMsg() { + var count = 0 + var start time.Time for { insetOrDeleteMsg := msgpb.InsertOrDeleteMsg{} msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) @@ -52,8 +56,16 @@ func (mc *MessageClient) ReceiveInsertOrDeleteMsg() { if err != nil { log.Fatal(err) } + if count == 0 { + start = time.Now() + } + count++ mc.insertOrDeleteChan <- &insetOrDeleteMsg mc.insertOrDeleteConsumer.Ack(msg) + if count == 100000 - 1 { + elapsed := time.Since(start) + fmt.Println("Query node ReceiveInsertOrDeleteMsg time:", elapsed) + } } } diff --git a/reader/query_node.go b/reader/query_node.go index 3e6c983ca8..890780db5b 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -165,34 +165,47 @@ func (node *QueryNode) InitQueryNodeCollection() { //////////////////////////////////////////////////////////////////////////////////////////////////// func (node *QueryNode) RunInsertDelete() { + var count = 0 + var start time.Time for { - time.Sleep(2 * 1000 * time.Millisecond) + //time.Sleep(2 * 1000 * time.Millisecond) node.QueryNodeDataInit() // TODO: get timeRange from message client var timeRange = TimeRange{0, 0} var msgLen = node.PrepareBatchMsg() - fmt.Println("PrepareBatchMsg Done, Insert len = ", msgLen[0]) + //fmt.Println("PrepareBatchMsg Done, Insert len = ", msgLen[0]) if msgLen[0] == 0 { - fmt.Println("0 msg found") + //fmt.Println("0 msg found") continue } + if count == 0 { + start = time.Now() + } + count+=msgLen[0] node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) - fmt.Println("MessagesPreprocess Done") + //fmt.Println("MessagesPreprocess Done") node.WriterDelete() node.PreInsertAndDelete() - fmt.Println("PreInsertAndDelete Done") + //fmt.Println("PreInsertAndDelete Done") node.DoInsertAndDelete() - fmt.Println("DoInsertAndDelete Done") + //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - fmt.Print("UpdateSearchTimeSync Done\n\n\n") + //fmt.Print("UpdateSearchTimeSync Done\n\n\n") + if count == 100000 - 1 { + elapsed := time.Since(start) + fmt.Println("Query node insert 10 × 10000 time:", elapsed) + } } } func (node *QueryNode) RunSearch() { for { - time.Sleep(2 * 1000 * time.Millisecond) + //time.Sleep(2 * 1000 * time.Millisecond) + + start := time.Now() + if len(node.messageClient.GetSearchChan()) <= 0 { - fmt.Println("null Search") + //fmt.Println("null Search") continue } node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] @@ -200,6 +213,9 @@ func (node *QueryNode) RunSearch() { node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") node.Search(node.messageClient.SearchMsg) + + elapsed := time.Since(start) + fmt.Println("Query node search time:", elapsed) } } @@ -459,8 +475,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { for _, res := range resultsTmp { results.Entities.Ids = append(results.Entities.Ids, res.ResultId) results.Distances = append(results.Distances, res.ResultDistance) + results.Scores = append(results.Distances, float32(0)) } + results.RowNum = int64(len(results.Distances)) + // 3. publish result to pulsar node.PublishSearchResult(&results, clientId) }