diff --git a/conf/conf.go b/conf/conf.go index 47299c547f..8ac09d4642 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -68,6 +68,7 @@ type Writer struct { StopFlag int64 ReaderQueueSize int SearchByIdChanSize int + Parallelism int TopicStart int TopicEnd int } diff --git a/conf/config.yaml b/conf/config.yaml index f345b0903d..81046c4ee2 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -54,6 +54,7 @@ writer: stopflag: -2 readerqueuesize: 10000 searchbyidchansize: 10000 + parallelism: 100 topicstart: 0 topicend: 128 diff --git a/writer/main.go b/writer/main.go index 7a85ad5af5..b17b18e122 100644 --- a/writer/main.go +++ b/writer/main.go @@ -10,7 +10,6 @@ import ( "log" "os" "strconv" - "sync" "time" ) @@ -25,7 +24,6 @@ func main() { //TODO::close client / consumer/ producer mc.ReceiveMessage() - wg := sync.WaitGroup{} ctx := context.Background() kv, err := storage.NewStore(ctx, conf.Config.Storage.Driver) // TODO:: if err != nil, should retry link @@ -75,7 +73,7 @@ func main() { start = time.Now() } if msgLength > 0 { - wn.DoWriteNode(ctx, &wg) + wn.DoWriteNode(ctx) fmt.Println("write node do a batch message, storage len: ", msgLength) } // Test insert time @@ -107,7 +105,7 @@ func main() { } msgLength := wn.MessageClient.PrepareBatchMsg() if msgLength > 0 { - wn.DoWriteNode(ctx, &wg) + wn.DoWriteNode(ctx) fmt.Println("write node do a batch message, storage len: ", msgLength) } } diff --git a/writer/write_node/writer_node.go b/writer/write_node/writer_node.go index c01058431d..6458db8eb1 100644 --- a/writer/write_node/writer_node.go +++ b/writer/write_node/writer_node.go @@ -3,6 +3,7 @@ package write_node import ( "context" "fmt" + "github.com/czs007/suvlim/conf" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" storage "github.com/czs007/suvlim/storage/pkg" "github.com/czs007/suvlim/storage/pkg/types" @@ -87,7 +88,7 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr return nil } -func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg, wg *sync.WaitGroup) error { +func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg) error { var prefixKey string var prefixKeys [][]byte var timeStamps []uint64 @@ -120,10 +121,8 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOr err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps) if err != nil { fmt.Println("Can't delete data") - wg.Done() return err } - wg.Done() return nil } @@ -131,10 +130,27 @@ func (wn *WriteNode) UpdateTimeSync(timeSync uint64) { wn.TimeSync = timeSync } -func (wn *WriteNode) DoWriteNode(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(2) - go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg, wg) - go wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg, wg) +func (wn *WriteNode) DoWriteNode(ctx context.Context) { + numInsertData := len(wn.MessageClient.InsertMsg) + numGoRoute := conf.Config.Writer.Parallelism + batchSize := numInsertData / numGoRoute + if numInsertData % numGoRoute != 0 { + batchSize += 1 + } + start := 0 + end := 0 + wg := sync.WaitGroup{} + for end < numInsertData { + if end + batchSize >= numInsertData { + end = numInsertData + } else { + end = end + batchSize + } + wg.Add(1) + go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg[start:end], &wg) + start = end + } wg.Wait() + wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg) wn.UpdateTimeSync(wn.MessageClient.TimeSync()) }