Query node add segments statistic service

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2020-09-16 15:21:10 +08:00 committed by yefu.chen
parent fa6e228a91
commit 7acc51e40e
10 changed files with 226 additions and 107 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
timesync "github.com/czs007/suvlim/timesync"
"github.com/golang/protobuf/proto"
@ -20,10 +21,11 @@ type MessageClient struct {
key2SegChan chan *msgpb.Key2SegMsg
// pulsar
client pulsar.Client
searchResultProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
client pulsar.Client
searchResultProducer pulsar.Producer
segmentsStatisticProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
// batch messages
InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg
@ -45,7 +47,7 @@ func (mc *MessageClient) TimeSyncEnd() uint64 {
return mc.timestampBatchEnd
}
func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) {
func (mc *MessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult) {
var msgBuffer, _ = proto.Marshal(&msg)
if _, err := mc.searchResultProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: msgBuffer,
@ -54,6 +56,17 @@ func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) {
}
}
func (mc *MessageClient) SendSegmentsStatistic(ctx context.Context, statisticData *[]masterPb.SegmentStat) {
for _, data := range *statisticData {
var stat, _ = proto.Marshal(&data)
if _, err := mc.segmentsStatisticProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: stat,
}); err != nil {
log.Fatal(err)
}
}
}
func (mc *MessageClient) GetSearchChan() <-chan *msgpb.SearchMsg {
return mc.searchChan
}
@ -138,6 +151,7 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) {
//create producer
mc.searchResultProducer = mc.creatProducer("SearchResult")
mc.segmentsStatisticProducer = mc.creatProducer("SegmentsStatistic")
//create consumer
mc.searchConsumer = mc.createConsumer("Search")
@ -176,6 +190,7 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) {
func (mc *MessageClient) Close() {
mc.client.Close()
mc.searchResultProducer.Close()
mc.segmentsStatisticProducer.Close()
mc.searchConsumer.Close()
mc.key2segConsumer.Close()
mc.timeSyncCfg.Close()

View File

@ -195,7 +195,7 @@ func (node *QueryNode) RunInsertDelete(wg * sync.WaitGroup) {
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
if msgLen[1] == 0 {
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
continue
}
@ -232,7 +232,11 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
fmt.Println("Do Search...")
node.Search(node.messageClient.SearchMsg)
var status = node.Search(node.messageClient.SearchMsg)
if status.ErrorCode != 0 {
fmt.Println("Search Failed")
node.PublishFailedSearchResult()
}
}
}
wg.Done()
@ -431,7 +435,8 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes
}
func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
var clientId = (*(searchMessages[0])).ClientId
// TODO: use client id to publish results to different clients
// var clientId = (*(searchMessages[0])).ClientId
type SearchResultTmp struct {
ResultId int64
@ -514,7 +519,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
results.RowNum = int64(len(results.Distances))
// 3. publish result to pulsar
node.PublishSearchResult(&results, clientId)
node.PublishSearchResult(&results)
}
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}

View File

@ -20,6 +20,11 @@ func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) {
mc.ReceiveMessage()
qn := CreateQueryNode(0, 0, &mc)
qn.InitQueryNodeCollection()
// Segments Services
// go qn.SegmentManagementService()
go qn.SegmentStatisticService()
wg := sync.WaitGroup{}
wg.Add(2)
go qn.RunInsertDelete(&wg)

View File

@ -2,9 +2,8 @@ package reader
import (
"context"
"fmt"
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"strconv"
)
type ResultEntityIds []int64
@ -14,17 +13,11 @@ type SearchResult struct {
ResultDistances []float32
}
func getResultTopicByClientId(clientId int64) string {
// TODO: Result topic?
return "result-topic/partition-" + strconv.FormatInt(clientId, 10)
}
func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult, clientId int64) msgPb.Status {
func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult) msgPb.Status {
var ctx = context.Background()
var resultTopic = getResultTopicByClientId(clientId)
node.messageClient.Send(ctx, *results)
fmt.Println(resultTopic)
node.messageClient.SendResult(ctx, *results)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
@ -38,14 +31,14 @@ func (node *QueryNode) PublishFailedSearchResult() msgPb.Status {
var ctx = context.Background()
node.messageClient.Send(ctx, results)
node.messageClient.SendResult(ctx, results)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) PublicStatistic(statisticTopic string) msgPb.Status {
// TODO: get statistic info
// getStatisticInfo()
// var info = getStatisticInfo()
// TODO: Pulsar publish
func (node *QueryNode) PublicStatistic(statisticData *[]masterPb.SegmentStat) msgPb.Status {
var ctx = context.Background()
node.messageClient.SendSegmentsStatistic(ctx, statisticData)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}

View File

@ -1,6 +1,7 @@
package reader
import (
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"testing"
)
@ -14,18 +15,54 @@ func TestResult_PublishSearchResult(t *testing.T) {
node.SegmentsMap[0] = segment
// TODO: start pulsar server
// TODO: fix result PublishSearchResult
const N = 10
var entityIDs = msgPb.Entities {
Ids: make([]int64, N),
}
var results = msgPb.QueryResult {
var result = msgPb.QueryResult {
Entities: &entityIDs,
Distances: make([]float32, N),
}
for i := 0; i < N; i++ {
results.Entities.Ids = append(results.Entities.Ids, int64(i))
results.Distances = append(results.Distances, float32(i))
result.Entities.Ids = append(result.Entities.Ids, int64(i))
result.Distances = append(result.Distances, float32(i))
}
node.PublishSearchResult(&results, 0)
node.PublishSearchResult(&result)
}
func TestResult_PublishFailedSearchResult(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
node.PublishFailedSearchResult()
}
func TestResult_PublicStatistic(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
var statisticData = make([]masterPb.SegmentStat, 0)
for segmentID, segment := range node.SegmentsMap {
currentMemSize := segment.GetMemSize()
memIncreaseRate := float32(0)
stat := masterPb.SegmentStat{
SegmentId: uint64(segmentID),
MemorySize: currentMemSize,
MemoryRate: memIncreaseRate,
}
statisticData = append(statisticData, stat)
}
// TODO: start pulsar server
node.PublicStatistic(&statisticData)
}

View File

@ -31,6 +31,7 @@ type Segment struct {
SegmentPtr C.CSegmentBase
SegmentId int64
SegmentCloseTime uint64
LastMemSize uint64
}
func (s *Segment) GetStatus() int {
@ -76,6 +77,10 @@ func (s *Segment) Close() error {
return nil
}
func (s *Segment) GetMemSize() uint64 {
return 100000
}
////////////////////////////////////////////////////////////////////////////
func (s *Segment) SegmentPreInsert(numOfRecords int) int64 {
/*C.PreInsert
@ -113,7 +118,7 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
var numOfRow = len(*entityIDs)
var sizeofPerRow = len((*records)[0])
var rawData = make([]byte, numOfRow * sizeofPerRow)
var rawData = make([]byte, numOfRow*sizeofPerRow)
for i := 0; i < len(*records); i++ {
copy(rawData, (*records)[i])
}
@ -126,13 +131,13 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])
var status = C.Insert(s.SegmentPtr,
cOffset,
cNumOfRows,
cEntityIdsPtr,
cTimestampsPtr,
cOffset,
cNumOfRows,
cEntityIdsPtr,
cTimestampsPtr,
cRawDataVoidPtr,
cSizeofPerRow,
cNumOfRows)
cSizeofPerRow,
cNumOfRows)
if status != 0 {
return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))

View File

@ -1,40 +0,0 @@
package reader
import (
"fmt"
"time"
)
func (node *QueryNode) SegmentsManagement() {
node.queryNodeTimeSync.UpdateTSOTimeSync()
var timeNow = node.queryNodeTimeSync.TSOTimeSync
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// start new segment and add it into partition.OpenedSegments
// TODO: get segmentID from master
var segmentID int64 = 0
var newSegment = partition.NewSegment(segmentID)
newSegment.SegmentCloseTime = timeNow + SegmentLifetime
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[segmentID] = newSegment
// close old segment and move it into partition.ClosedSegments
// TODO: check status
var _ = oldSegment.Close()
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
}
}
}
}
}
func (node *QueryNode) SegmentService() {
for {
time.Sleep(200 * time.Millisecond)
node.SegmentsManagement()
fmt.Println("do segments management in 200ms")
}
}

View File

@ -1,29 +0,0 @@
package reader
import (
"testing"
)
func TestSegmentManagement_SegmentsManagement(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment management
node.SegmentsManagement()
}
func TestSegmentManagement_SegmentService(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment service
node.SegmentService()
}

75
reader/segment_service.go Normal file
View File

@ -0,0 +1,75 @@
package reader
import (
"fmt"
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"log"
"strconv"
"time"
)
func (node *QueryNode) SegmentsManagement() {
node.queryNodeTimeSync.UpdateTSOTimeSync()
var timeNow = node.queryNodeTimeSync.TSOTimeSync
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// start new segment and add it into partition.OpenedSegments
// TODO: get segmentID from master
var segmentID int64 = 0
var newSegment = partition.NewSegment(segmentID)
newSegment.SegmentCloseTime = timeNow + SegmentLifetime
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[segmentID] = newSegment
// close old segment and move it into partition.ClosedSegments
// TODO: check status
var _ = oldSegment.Close()
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
}
}
}
}
}
func (node *QueryNode) SegmentManagementService() {
for {
sleepMillisecondTime := 200
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentsManagement()
fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms")
}
}
func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) {
var statisticData = make([]masterPb.SegmentStat, 0)
for segmentID, segment := range node.SegmentsMap {
currentMemSize := segment.GetMemSize()
memIncreaseRate := float32(currentMemSize-segment.LastMemSize) / (float32(sleepMillisecondTime) / 1000)
stat := masterPb.SegmentStat{
// TODO: set master pb's segment id type from uint64 to int64
SegmentId: uint64(segmentID),
MemorySize: currentMemSize,
MemoryRate: memIncreaseRate,
}
statisticData = append(statisticData, stat)
}
var status = node.PublicStatistic(&statisticData)
if status.ErrorCode != msgPb.ErrorCode_SUCCESS {
log.Printf("Publish segments statistic failed")
}
}
func (node *QueryNode) SegmentStatisticService() {
for {
sleepMillisecondTime := 1000
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentStatistic(sleepMillisecondTime)
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
}
}

View File

@ -0,0 +1,53 @@
package reader
import (
"testing"
)
func TestSegmentManagement_SegmentsManagement(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment management
node.SegmentsManagement()
}
func TestSegmentManagement_SegmentService(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment service
node.SegmentManagementService()
}
func TestSegmentManagement_SegmentStatistic(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
node.SegmentStatistic(1000)
}
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
node.SegmentStatisticService()
}