From 032f89e5349f1149100b513b82a7bd046ddfec3f Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 16 Sep 2020 11:07:55 +0800 Subject: [PATCH] Add message client id and fix buffer out of range Signed-off-by: bigsheeper --- reader/message_client/message_client.go | 23 ++++++++++++++--------- reader/query_node.go | 22 +++++++--------------- reader/reader.go | 14 +++++++++++--- reader/reader_test.go | 6 +++++- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index 37e1232d3f..8fa119cea5 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -15,7 +15,7 @@ type MessageClient struct { // timesync timeSyncCfg *timesync.ReaderTimeSyncCfg - //message channel + // message channel searchChan chan *msgpb.SearchMsg key2SegChan chan *msgpb.Key2SegMsg @@ -32,6 +32,9 @@ type MessageClient struct { timestampBatchStart uint64 timestampBatchEnd uint64 batchIDLen int + + // + MessageClientID int } func (mc *MessageClient) TimeSyncStart() uint64 { @@ -51,7 +54,7 @@ func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) { } } -func (mc *MessageClient) GetSearchChan() <- chan *msgpb.SearchMsg { +func (mc *MessageClient) GetSearchChan() <-chan *msgpb.SearchMsg { return mc.searchChan } @@ -106,7 +109,7 @@ func (mc *MessageClient) creatProducer(topicName string) pulsar.Producer { func (mc *MessageClient) createConsumer(topicName string) pulsar.Consumer { consumer, err := mc.client.Subscribe(pulsar.ConsumerOptions{ Topic: topicName, - SubscriptionName: "reader", + SubscriptionName: "reader" + strconv.Itoa(mc.MessageClientID), }) if err != nil { @@ -127,7 +130,9 @@ func (mc *MessageClient) createClient(url string) pulsar.Client { return client } -func (mc *MessageClient) InitClient(url string) { +func (mc *MessageClient) InitClient(url string, numOfQueryNode int) { + const ChannelNum = 1024 + //create client mc.client = mc.createClient(url) @@ -148,16 +153,16 @@ func (mc *MessageClient) InitClient(url string) { //init timesync URL := "pulsar://localhost:6650" timeSyncTopic := "TimeSync" - timeSyncSubName := "reader" - readTopics := make([]string, 0, 1024) - for i := 0; i < 1024; i++ { + timeSyncSubName := "reader" + strconv.Itoa(mc.MessageClientID) + readTopics := make([]string, 0, ChannelNum) + for i := ChannelNum / numOfQueryNode * mc.MessageClientID; i < ChannelNum/numOfQueryNode*(mc.MessageClientID+1); i++ { str := "InsertOrDelete-partition-" str = str + strconv.Itoa(i) readTopics = append(readTopics, str) } - readSubName := "reader" + readSubName := "reader" + strconv.Itoa(mc.MessageClientID) proxyIdList := []int64{0} - timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -2, timesync.WithReaderQueueSize(1024)) + timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -2, timesync.WithReaderQueueSize(ChannelNum)) if err != nil { log.Fatal(err) } diff --git a/reader/query_node.go b/reader/query_node.go index 6ea0f150e3..1199ff17d7 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -17,7 +17,6 @@ import ( "fmt" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" "github.com/czs007/suvlim/reader/message_client" - "github.com/stretchr/testify/assert" "sort" "sync" "sync/atomic" @@ -178,12 +177,6 @@ func (node *QueryNode) PrepareBatchMsg() []int { return msgLen } -func (node *QueryNode) StartMessageClient(pulsarURL string) { - // TODO: add consumerMsgSchema - node.messageClient.InitClient(pulsarURL) - go node.messageClient.ReceiveMessage() -} - func (node *QueryNode) InitQueryNodeCollection() { // TODO: remove hard code, add collection creation request // TODO: error handle @@ -274,22 +267,21 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } // 2. Remove invalid messages from buffer. - bufferLen := len(node.buffer.validInsertDeleteBuffer) - assert.Equal(nil, len(node.buffer.validInsertDeleteBuffer), len(node.buffer.InsertDeleteBuffer)) - for i:= 0; i < bufferLen - 2; i++ { - if !node.buffer.validInsertDeleteBuffer[i] { - copy(node.buffer.InsertDeleteBuffer[i:], node.buffer.InsertDeleteBuffer[i+1:]) // Shift a[i+1:] left one index. - node.buffer.InsertDeleteBuffer[len(node.buffer.InsertDeleteBuffer)-1] = nil // Erase last element (write zero value). - node.buffer.InsertDeleteBuffer = node.buffer.InsertDeleteBuffer[:len(node.buffer.InsertDeleteBuffer)-1] // Truncate slice. + tmpInsertOrDeleteBuffer := make([]*msgPb.InsertOrDeleteMsg ,0) + for i, isValid := range node.buffer.validInsertDeleteBuffer { + if isValid { + tmpInsertOrDeleteBuffer = append(tmpInsertOrDeleteBuffer, node.buffer.InsertDeleteBuffer[i]) } } + node.buffer.InsertDeleteBuffer = tmpInsertOrDeleteBuffer + // 3. Resize the valid bitmap and set all bits to true. node.buffer.validInsertDeleteBuffer = node.buffer.validInsertDeleteBuffer[:len(node.buffer.InsertDeleteBuffer)] for i := range node.buffer.validInsertDeleteBuffer { node.buffer.validInsertDeleteBuffer[i] = true } - // 3. Extract messages before readTimeSync from current messageClient. + // 4. Extract messages before readTimeSync from current messageClient. // Move massages after readTimeSync to QueryNodeDataBuffer. // Set valid bitmap to true. for _, msg := range insertDeleteMessages { diff --git a/reader/reader.go b/reader/reader.go index e5308987ad..ab0d905467 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -2,12 +2,20 @@ package reader import ( "github.com/czs007/suvlim/reader/message_client" + "log" "sync" ) -func StartQueryNode(pulsarURL string) { - mc := message_client.MessageClient{} - mc.InitClient(pulsarURL) +func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) { + if messageClientID >= numOfQueryNode { + log.Printf("Illegal channel id") + return + } + + mc := message_client.MessageClient{ + MessageClientID: messageClientID, + } + mc.InitClient(pulsarURL, numOfQueryNode) mc.ReceiveMessage() qn := CreateQueryNode(0, 0, &mc) diff --git a/reader/reader_test.go b/reader/reader_test.go index 7899f3437e..6f3758aeb6 100644 --- a/reader/reader_test.go +++ b/reader/reader_test.go @@ -7,5 +7,9 @@ import ( func TestReader_startQueryNode(t *testing.T) { pulsarURL := "pulsar://localhost:6650" - StartQueryNode(pulsarURL) + + numOfQueryNode := 2 + + go StartQueryNode(pulsarURL, numOfQueryNode, 0) + StartQueryNode(pulsarURL, numOfQueryNode, 1) }