From 1b1bafaff1c6c3a7fbea420eb62cebb90e06cfec Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 30 Aug 2023 14:23:06 +0800 Subject: [PATCH] Bulkinsert read varchar by batch (#26199) Signed-off-by: yhmo --- internal/util/importutil/numpy_adapter.go | 67 +++++++++++++++++------ 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/internal/util/importutil/numpy_adapter.go b/internal/util/importutil/numpy_adapter.go index b5a89086eb..17c6832717 100644 --- a/internal/util/importutil/numpy_adapter.go +++ b/internal/util/importutil/numpy_adapter.go @@ -561,9 +561,26 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) { return nil, errors.New("numpy reader is nil") } + // read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB + batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1 + if utf { + batchRead += SingleBlockSize / (utf8.UTFMax * maxLen) + } else { + batchRead += SingleBlockSize / maxLen + } + + log.Info("Numpy adapter: prepare to read varchar batch by batch", + zap.Int("readSize", readSize), zap.Int("batchRead", batchRead)) + // read data data := make([]string, 0) - for i := 0; i < readSize; i++ { + for { + // the last batch + readDone := len(data) + if readDone+batchRead > readSize { + batchRead = readSize - readDone + } + if utf { // in the numpy file with utf32 encoding, the dType could be like " 0 { - buf = buf[:n] - } - data = append(data, string(buf)) + // read string one by one from the buffer + for j := 0; j < batchRead; j++ { + oneBuf := buf[j*maxLen : (j+1)*maxLen] + n := bytes.Index(oneBuf, []byte{0}) + if n > 0 { + oneBuf = oneBuf[:n] + } + + data = append(data, string(oneBuf)) + } + } + + // quit the circle if specified size is read + if len(data) >= readSize { + break } } + log.Info("Numpy adapter: a block of varchar has been read", zap.Int("rowCount", len(data))) + // update read position after successfully read n.readPosition += readSize