From 7acc51e40ef22f91ab3c96a77808bd137eccc64e Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 16 Sep 2020 15:21:10 +0800 Subject: [PATCH] Query node add segments statistic service Signed-off-by: bigsheeper --- reader/message_client/message_client.go | 25 +++++++-- reader/query_node.go | 13 +++-- reader/reader.go | 5 ++ reader/result.go | 27 ++++----- reader/result_test.go | 47 ++++++++++++++-- reader/segment.go | 19 ++++--- reader/segment_management.go | 40 ------------- reader/segment_management_test.go | 29 ---------- reader/segment_service.go | 75 +++++++++++++++++++++++++ reader/segment_service_test.go | 53 +++++++++++++++++ 10 files changed, 226 insertions(+), 107 deletions(-) delete mode 100644 reader/segment_management.go delete mode 100644 reader/segment_management_test.go create mode 100644 reader/segment_service.go create mode 100644 reader/segment_service_test.go diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index 8fa119cea5..d64cc3cd33 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" + masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" timesync "github.com/czs007/suvlim/timesync" "github.com/golang/protobuf/proto" @@ -20,10 +21,11 @@ type MessageClient struct { key2SegChan chan *msgpb.Key2SegMsg // pulsar - client pulsar.Client - searchResultProducer pulsar.Producer - searchConsumer pulsar.Consumer - key2segConsumer pulsar.Consumer + client pulsar.Client + searchResultProducer pulsar.Producer + segmentsStatisticProducer pulsar.Producer + searchConsumer pulsar.Consumer + key2segConsumer pulsar.Consumer // batch messages InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg @@ -45,7 +47,7 @@ func (mc *MessageClient) TimeSyncEnd() uint64 { return mc.timestampBatchEnd } -func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) { +func (mc *MessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult) { var msgBuffer, _ = proto.Marshal(&msg) if _, err := mc.searchResultProducer.Send(ctx, &pulsar.ProducerMessage{ Payload: msgBuffer, @@ -54,6 +56,17 @@ func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) { } } +func (mc *MessageClient) SendSegmentsStatistic(ctx context.Context, statisticData *[]masterPb.SegmentStat) { + for _, data := range *statisticData { + var stat, _ = proto.Marshal(&data) + if _, err := mc.segmentsStatisticProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: stat, + }); err != nil { + log.Fatal(err) + } + } +} + func (mc *MessageClient) GetSearchChan() <-chan *msgpb.SearchMsg { return mc.searchChan } @@ -138,6 +151,7 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) { //create producer mc.searchResultProducer = mc.creatProducer("SearchResult") + mc.segmentsStatisticProducer = mc.creatProducer("SegmentsStatistic") //create consumer mc.searchConsumer = mc.createConsumer("Search") @@ -176,6 +190,7 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) { func (mc *MessageClient) Close() { mc.client.Close() mc.searchResultProducer.Close() + mc.segmentsStatisticProducer.Close() mc.searchConsumer.Close() mc.key2segConsumer.Close() mc.timeSyncCfg.Close() diff --git a/reader/query_node.go b/reader/query_node.go index 1199ff17d7..d1965e1643 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -195,7 +195,7 @@ func (node *QueryNode) RunInsertDelete(wg * sync.WaitGroup) { var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} - if msgLen[1] == 0 { + if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { continue } @@ -232,7 +232,11 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") - node.Search(node.messageClient.SearchMsg) + var status = node.Search(node.messageClient.SearchMsg) + if status.ErrorCode != 0 { + fmt.Println("Search Failed") + node.PublishFailedSearchResult() + } } } wg.Done() @@ -431,7 +435,8 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes } func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { - var clientId = (*(searchMessages[0])).ClientId + // TODO: use client id to publish results to different clients + // var clientId = (*(searchMessages[0])).ClientId type SearchResultTmp struct { ResultId int64 @@ -514,7 +519,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { results.RowNum = int64(len(results.Distances)) // 3. publish result to pulsar - node.PublishSearchResult(&results, clientId) + node.PublishSearchResult(&results) } return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} diff --git a/reader/reader.go b/reader/reader.go index ab0d905467..8afe3edc76 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -20,6 +20,11 @@ func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) { mc.ReceiveMessage() qn := CreateQueryNode(0, 0, &mc) qn.InitQueryNodeCollection() + + // Segments Services + // go qn.SegmentManagementService() + go qn.SegmentStatisticService() + wg := sync.WaitGroup{} wg.Add(2) go qn.RunInsertDelete(&wg) diff --git a/reader/result.go b/reader/result.go index 6b88199865..e2bdc9b7d7 100644 --- a/reader/result.go +++ b/reader/result.go @@ -2,9 +2,8 @@ package reader import ( "context" - "fmt" + masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" - "strconv" ) type ResultEntityIds []int64 @@ -14,17 +13,11 @@ type SearchResult struct { ResultDistances []float32 } -func getResultTopicByClientId(clientId int64) string { - // TODO: Result topic? - return "result-topic/partition-" + strconv.FormatInt(clientId, 10) -} - -func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult, clientId int64) msgPb.Status { +func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult) msgPb.Status { var ctx = context.Background() - var resultTopic = getResultTopicByClientId(clientId) - node.messageClient.Send(ctx, *results) - fmt.Println(resultTopic) + node.messageClient.SendResult(ctx, *results) + return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} } @@ -38,14 +31,14 @@ func (node *QueryNode) PublishFailedSearchResult() msgPb.Status { var ctx = context.Background() - node.messageClient.Send(ctx, results) + node.messageClient.SendResult(ctx, results) return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} } -func (node *QueryNode) PublicStatistic(statisticTopic string) msgPb.Status { - // TODO: get statistic info - // getStatisticInfo() - // var info = getStatisticInfo() - // TODO: Pulsar publish +func (node *QueryNode) PublicStatistic(statisticData *[]masterPb.SegmentStat) msgPb.Status { + var ctx = context.Background() + + node.messageClient.SendSegmentsStatistic(ctx, statisticData) + return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} } diff --git a/reader/result_test.go b/reader/result_test.go index 5ce12ba873..700e7497b1 100644 --- a/reader/result_test.go +++ b/reader/result_test.go @@ -1,6 +1,7 @@ package reader import ( + masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" "testing" ) @@ -14,18 +15,54 @@ func TestResult_PublishSearchResult(t *testing.T) { node.SegmentsMap[0] = segment // TODO: start pulsar server - // TODO: fix result PublishSearchResult const N = 10 var entityIDs = msgPb.Entities { Ids: make([]int64, N), } - var results = msgPb.QueryResult { + var result = msgPb.QueryResult { Entities: &entityIDs, Distances: make([]float32, N), } for i := 0; i < N; i++ { - results.Entities.Ids = append(results.Entities.Ids, int64(i)) - results.Distances = append(results.Distances, float32(i)) + result.Entities.Ids = append(result.Entities.Ids, int64(i)) + result.Distances = append(result.Distances, float32(i)) } - node.PublishSearchResult(&results, 0) + node.PublishSearchResult(&result) +} + +func TestResult_PublishFailedSearchResult(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + // TODO: start pulsar server + node.PublishFailedSearchResult() +} + +func TestResult_PublicStatistic(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + var statisticData = make([]masterPb.SegmentStat, 0) + + for segmentID, segment := range node.SegmentsMap { + currentMemSize := segment.GetMemSize() + memIncreaseRate := float32(0) + stat := masterPb.SegmentStat{ + SegmentId: uint64(segmentID), + MemorySize: currentMemSize, + MemoryRate: memIncreaseRate, + } + statisticData = append(statisticData, stat) + } + + // TODO: start pulsar server + node.PublicStatistic(&statisticData) } diff --git a/reader/segment.go b/reader/segment.go index 7b775892b4..273212aea9 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -31,6 +31,7 @@ type Segment struct { SegmentPtr C.CSegmentBase SegmentId int64 SegmentCloseTime uint64 + LastMemSize uint64 } func (s *Segment) GetStatus() int { @@ -76,6 +77,10 @@ func (s *Segment) Close() error { return nil } +func (s *Segment) GetMemSize() uint64 { + return 100000 +} + //////////////////////////////////////////////////////////////////////////// func (s *Segment) SegmentPreInsert(numOfRecords int) int64 { /*C.PreInsert @@ -113,7 +118,7 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] var numOfRow = len(*entityIDs) var sizeofPerRow = len((*records)[0]) - var rawData = make([]byte, numOfRow * sizeofPerRow) + var rawData = make([]byte, numOfRow*sizeofPerRow) for i := 0; i < len(*records); i++ { copy(rawData, (*records)[i]) } @@ -126,13 +131,13 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) var status = C.Insert(s.SegmentPtr, - cOffset, - cNumOfRows, - cEntityIdsPtr, - cTimestampsPtr, + cOffset, + cNumOfRows, + cEntityIdsPtr, + cTimestampsPtr, cRawDataVoidPtr, - cSizeofPerRow, - cNumOfRows) + cSizeofPerRow, + cNumOfRows) if status != 0 { return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) diff --git a/reader/segment_management.go b/reader/segment_management.go deleted file mode 100644 index 1dd20026b0..0000000000 --- a/reader/segment_management.go +++ /dev/null @@ -1,40 +0,0 @@ -package reader - -import ( - "fmt" - "time" -) - -func (node *QueryNode) SegmentsManagement() { - node.queryNodeTimeSync.UpdateTSOTimeSync() - var timeNow = node.queryNodeTimeSync.TSOTimeSync - for _, collection := range node.Collections { - for _, partition := range collection.Partitions { - for _, oldSegment := range partition.OpenedSegments { - // TODO: check segment status - if timeNow >= oldSegment.SegmentCloseTime { - // start new segment and add it into partition.OpenedSegments - // TODO: get segmentID from master - var segmentID int64 = 0 - var newSegment = partition.NewSegment(segmentID) - newSegment.SegmentCloseTime = timeNow + SegmentLifetime - partition.OpenedSegments = append(partition.OpenedSegments, newSegment) - node.SegmentsMap[segmentID] = newSegment - - // close old segment and move it into partition.ClosedSegments - // TODO: check status - var _ = oldSegment.Close() - partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) - } - } - } - } -} - -func (node *QueryNode) SegmentService() { - for { - time.Sleep(200 * time.Millisecond) - node.SegmentsManagement() - fmt.Println("do segments management in 200ms") - } -} diff --git a/reader/segment_management_test.go b/reader/segment_management_test.go deleted file mode 100644 index f72b6f29f2..0000000000 --- a/reader/segment_management_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package reader - -import ( - "testing" -) - -func TestSegmentManagement_SegmentsManagement(t *testing.T) { - // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) - var collection = node.NewCollection("collection0", "fake schema") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - node.SegmentsMap[0] = segment - - // TODO: fix segment management - node.SegmentsManagement() -} - -func TestSegmentManagement_SegmentService(t *testing.T) { - // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) - var collection = node.NewCollection("collection0", "fake schema") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - node.SegmentsMap[0] = segment - - // TODO: fix segment service - node.SegmentService() -} diff --git a/reader/segment_service.go b/reader/segment_service.go new file mode 100644 index 0000000000..9680377f50 --- /dev/null +++ b/reader/segment_service.go @@ -0,0 +1,75 @@ +package reader + +import ( + "fmt" + masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" + msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "log" + "strconv" + "time" +) + +func (node *QueryNode) SegmentsManagement() { + node.queryNodeTimeSync.UpdateTSOTimeSync() + var timeNow = node.queryNodeTimeSync.TSOTimeSync + for _, collection := range node.Collections { + for _, partition := range collection.Partitions { + for _, oldSegment := range partition.OpenedSegments { + // TODO: check segment status + if timeNow >= oldSegment.SegmentCloseTime { + // start new segment and add it into partition.OpenedSegments + // TODO: get segmentID from master + var segmentID int64 = 0 + var newSegment = partition.NewSegment(segmentID) + newSegment.SegmentCloseTime = timeNow + SegmentLifetime + partition.OpenedSegments = append(partition.OpenedSegments, newSegment) + node.SegmentsMap[segmentID] = newSegment + + // close old segment and move it into partition.ClosedSegments + // TODO: check status + var _ = oldSegment.Close() + partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) + } + } + } + } +} + +func (node *QueryNode) SegmentManagementService() { + for { + sleepMillisecondTime := 200 + time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + node.SegmentsManagement() + fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") + } +} + +func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { + var statisticData = make([]masterPb.SegmentStat, 0) + + for segmentID, segment := range node.SegmentsMap { + currentMemSize := segment.GetMemSize() + memIncreaseRate := float32(currentMemSize-segment.LastMemSize) / (float32(sleepMillisecondTime) / 1000) + stat := masterPb.SegmentStat{ + // TODO: set master pb's segment id type from uint64 to int64 + SegmentId: uint64(segmentID), + MemorySize: currentMemSize, + MemoryRate: memIncreaseRate, + } + statisticData = append(statisticData, stat) + } + + var status = node.PublicStatistic(&statisticData) + if status.ErrorCode != msgPb.ErrorCode_SUCCESS { + log.Printf("Publish segments statistic failed") + } +} + +func (node *QueryNode) SegmentStatisticService() { + for { + sleepMillisecondTime := 1000 + time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + node.SegmentStatistic(sleepMillisecondTime) + fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") + } +} diff --git a/reader/segment_service_test.go b/reader/segment_service_test.go new file mode 100644 index 0000000000..a16e34497f --- /dev/null +++ b/reader/segment_service_test.go @@ -0,0 +1,53 @@ +package reader + +import ( + "testing" +) + +func TestSegmentManagement_SegmentsManagement(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + // TODO: fix segment management + node.SegmentsManagement() +} + +func TestSegmentManagement_SegmentService(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + // TODO: fix segment service + node.SegmentManagementService() +} + +func TestSegmentManagement_SegmentStatistic(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + // TODO: start pulsar server + node.SegmentStatistic(1000) +} + +func TestSegmentManagement_SegmentStatisticService(t *testing.T) { + // Construct node, collection, partition and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + node.SegmentsMap[0] = segment + + // TODO: start pulsar server + node.SegmentStatisticService() +}