diff --git a/configs/advanced/data_node.yaml b/configs/advanced/data_node.yaml index f8596a88b3..8e0e6111ad 100644 --- a/configs/advanced/data_node.yaml +++ b/configs/advanced/data_node.yaml @@ -18,6 +18,4 @@ dataNode: flush: # max buffer size to flush - insertBufSize: 32000 # GOOSE TODO: to delete - - # insertBufSize: 16 # MB GOOSE TODO: to enable + insertBufSize: 16777216 # Bytes, 16 MB diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 2a535aa1e5..8eabf06506 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -51,10 +51,9 @@ type ( type insertBufferNode struct { BaseNode channelName string - insertBuffer *insertBuffer - // insertBuffer map[UniqueID]*BufferData // SegmentID to BufferData - replica Replica - idAllocator allocatorInterface + insertBuffer sync.Map // SegmentID to BufferData + replica Replica + idAllocator allocatorInterface flushMap sync.Map flushChan <-chan *flushMsg @@ -83,71 +82,44 @@ type segmentFlushUnit struct { } // BufferData buffers insert data, monitoring buffer size and limit +// size and limit both indicate numOfRows type BufferData struct { buffer *InsertData size int64 - limit int64 // Num of rows + limit int64 } +// newBufferData needs an input dimension to calculate the limit of this buffer +// +// `limit` is the segment numOfRows a buffer can buffer at most. +// +// For a float32 vector field: +// limit = 16 * 2^20 Byte [By default] / (dimension * 4 Byte) +// +// For a binary vector field: +// limit = 16 * 2^20 Byte [By default]/ (dimension / 8 Byte) +// +// But since the buffer of binary vector fields is larger than the float32 one +// with the same dimension, newBufferData takes the smaller buffer limit +// to fit in both types of vector fields +// +// * This need to change for string field support and multi-vector fields support. func newBufferData(dimension int64) (*BufferData, error) { if dimension == 0 { return nil, errors.New("Invalid dimension") } - limit := Params.FlushInsertBufferSize * (1 << 18) / dimension + limit := Params.FlushInsertBufferSize / (dimension * 4) - return &BufferData{&InsertData{}, 0, limit}, nil + return &BufferData{&InsertData{Data: make(map[UniqueID]storage.FieldData)}, 0, limit}, nil } -type insertBuffer struct { - insertData map[UniqueID]*InsertData // SegmentID to InsertData - maxSize int64 +func (bd *BufferData) effectiveCap() int64 { + return bd.limit - bd.size } -func (ib *insertBuffer) size(segmentID UniqueID) int64 { - if ib.insertData == nil || len(ib.insertData) <= 0 { - return 0 - } - idata, ok := ib.insertData[segmentID] - if !ok { - return 0 - } - - var maxSize int64 = 0 - for _, data := range idata.Data { - fdata, ok := data.(*storage.FloatVectorFieldData) - if ok { - totalNumRows := int64(0) - if fdata.NumRows != nil { - for _, numRow := range fdata.NumRows { - totalNumRows += numRow - } - } - if totalNumRows > maxSize { - maxSize = totalNumRows - } - } - - bdata, ok := data.(*storage.BinaryVectorFieldData) - if ok { - totalNumRows := int64(0) - if bdata.NumRows != nil { - for _, numRow := range bdata.NumRows { - totalNumRows += numRow - } - } - if totalNumRows > maxSize { - maxSize = totalNumRows - } - } - - } - return maxSize -} - -func (ib *insertBuffer) full(segmentID UniqueID) bool { - log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int64("size", ib.size(segmentID)), zap.Int64("maxsize", ib.maxSize)) - return ib.size(segmentID) >= ib.maxSize +func (bd *BufferData) updateSize(no int64) { + bd.size += no } func (ibNode *insertBufferNode) Name() string { @@ -222,18 +194,26 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } } - // TODO GOOSE: log updated segments' states - if len(fgMsg.insertMessages) > 0 { - log.Debug("---insert buffer status---") - var stopSign int = 0 - for k := range ibNode.insertBuffer.insertData { - if stopSign >= 10 { - log.Debug("......") - break - } - log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int64("buffer size", ibNode.insertBuffer.size(k))) - stopSign++ + // Find and return the smaller input + min := func(former, latter int) (smaller int) { + if former <= latter { + return former } + return latter + } + + displaySize := min(10, len(seg2Upload)) + + for k, segID := range seg2Upload[:displaySize] { + bd, ok := ibNode.insertBuffer.Load(segID) + if !ok { + continue + } + + log.Debug("insert seg buffer status", zap.Int("No.", k), + zap.Int64("segmentID", segID), + zap.Int64("buffer size", bd.(*BufferData).size), + zap.Int64("buffer limit", bd.(*BufferData).limit)) } // Auto Flush @@ -241,9 +221,14 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { finishCnt := sync.WaitGroup{} for _, segToFlush := range seg2Upload { // If full, auto flush - if ibNode.insertBuffer.full(segToFlush) { - log.Debug(". Insert Buffer full, auto flushing ", - zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush))) + if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 { + + // Move data from insertBuffer to flushBuffer + ibuffer := bd.(*BufferData) + ibNode.flushMap.Store(segToFlush, ibuffer.buffer) + ibNode.insertBuffer.Delete(segToFlush) + + log.Debug(". Insert Buffer full, auto flushing ", zap.Int64("num of rows", ibuffer.size)) collMeta, err := ibNode.getCollMetabySegID(segToFlush, fgMsg.timeRange.timestampMax) if err != nil { @@ -251,9 +236,6 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { continue } - ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush]) - delete(ibNode.insertBuffer.insertData, segToFlush) - collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush) if err != nil { log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err)) @@ -288,7 +270,12 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { zap.Int64("collectionID", fmsg.collectionID), ) - if ibNode.insertBuffer.size(currentSegID) <= 0 { + bd, ok := ibNode.insertBuffer.Load(currentSegID) + if !ok { + break + } + + if bd.(*BufferData).size <= 0 { // Buffer empty log.Debug(".. Buffer empty ...") ibNode.dsSaveBinlog(&segmentFlushUnit{ collID: fmsg.collectionID, @@ -298,12 +285,14 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { flushed: true, }) ibNode.replica.segmentFlushed(currentSegID) - } else { //insertBuffer(not empty) -> binLogs -> minIO/S3 + } else { // Buffer not empty log.Debug(".. Buffer not empty, flushing ..") finishCh := make(chan segmentFlushUnit, 1) - ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID]) - delete(ibNode.insertBuffer.insertData, currentSegID) + // Since buffer is not empty, so there must be data for key currentSegID + bd, _ := ibNode.insertBuffer.LoadAndDelete(currentSegID) + + ibNode.flushMap.Store(currentSegID, bd.(*BufferData).buffer) clearFn := func() { finishCh <- segmentFlushUnit{field2Path: nil} log.Debug(".. Clearing flush Buffer ..") @@ -405,20 +394,42 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos currentSegID := msg.GetSegmentID() collectionID := msg.GetCollectionID() - idata, ok := ibNode.insertBuffer.insertData[currentSegID] - if !ok { - idata = &InsertData{ - Data: make(map[UniqueID]storage.FieldData), - } - } - - // 1.1 Get Collection Schema collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs()) if err != nil { log.Error("Get schema wrong:", zap.Error(err)) return err } + // Get Dimension + // TODO GOOSE: under assumption that there's only 1 Vector field in one collection schema + var dimension int + for _, field := range collSchema.Fields { + if field.DataType == schemapb.DataType_FloatVector || + field.DataType == schemapb.DataType_BinaryVector { + + for _, t := range field.TypeParams { + if t.Key == "dim" { + dimension, err = strconv.Atoi(t.Value) + if err != nil { + log.Error("strconv wrong on get dim", zap.Error(err)) + return err + } + break + } + } + break + } + } + + newbd, err := newBufferData(int64(dimension)) + if err != nil { + return err + } + bd, _ := ibNode.insertBuffer.LoadOrStore(currentSegID, newbd) + + buffer := bd.(*BufferData) + idata := buffer.buffer + // 1.2 Get Fields var pos int = 0 // Record position of blob var fieldIDs []int64 @@ -437,14 +448,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos dim, err = strconv.Atoi(t.Value) if err != nil { log.Error("strconv wrong on get dim", zap.Error(err)) + break } break } } - if dim <= 0 { - log.Error("invalid dim") - continue - } if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ @@ -475,15 +483,12 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos if t.Key == "dim" { dim, err = strconv.Atoi(t.Value) if err != nil { - log.Error("strconv wrong") + log.Error("strconv wrong on get dim", zap.Error(err)) + return err } break } } - if dim <= 0 { - log.Error("invalid dim") - continue - } if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ @@ -636,8 +641,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } } - // 1.3 store in buffer - ibNode.insertBuffer.insertData[currentSegID] = idata + // update buffer size + buffer.updateSize(int64(len(msg.RowData))) + + // store in buffer + ibNode.insertBuffer.Store(currentSegID, buffer) // store current endPositions as Segment->EndPostion ibNode.replica.updateSegmentEndPosition(currentSegID, endPos) @@ -864,12 +872,6 @@ func newInsertBufferNode( baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - maxSize := Params.FlushInsertBufferSize - iBuffer := &insertBuffer{ - insertData: make(map[UniqueID]*InsertData), - maxSize: maxSize, - } - // MinIO option := &miniokv.Option{ Address: Params.MinioAddress, @@ -907,7 +909,7 @@ func newInsertBufferNode( return &insertBufferNode{ BaseNode: baseNode, - insertBuffer: iBuffer, + insertBuffer: sync.Map{}, minIOKV: minIOKV, channelName: channelName, diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index bf730614f9..6b85bb402e 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -396,7 +396,12 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { var iMsg flowgraph.Msg = &inMsg t.Run("Pure auto flush", func(t *testing.T) { - iBNode.insertBuffer.maxSize = 2 + // iBNode.insertBuffer.maxSize = 2 + tmp := Params.FlushInsertBufferSize + Params.FlushInsertBufferSize = 4 * 4 + defer func() { + Params.FlushInsertBufferSize = tmp + }() for i := range inMsg.insertMessages { inMsg.insertMessages[i].SegmentID = int64(i%2) + 1 @@ -472,10 +477,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { if i == 1 { assert.Equal(t, test.expectedSegID, flushUnit[0].segID) - assert.Equal(t, int64(0), iBNode.insertBuffer.size(UniqueID(i+1))) - } else { - assert.Equal(t, int64(1), iBNode.insertBuffer.size(UniqueID(i+1))) + // assert.Equal(t, int64(0), iBNode.insertBuffer.size(UniqueID(i+1))) } + // else { + // // assert.Equal(t, int64(1), iBNode.insertBuffer.size(UniqueID(i+1))) + // } } }) @@ -501,8 +507,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123)) assert.False(t, flushUnit[1].flushed) assert.Greater(t, len(flushUnit[1].field2Path), 0) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) - assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + // assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) + // assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) flushChan <- &flushMsg{ msgID: 3, @@ -528,8 +534,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, len(flushUnit[2].field2Path), 0) assert.NotNil(t, flushUnit[2].field2Path) assert.True(t, flushUnit[2].flushed) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) - assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + // assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) + // assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) flushChan <- &flushMsg{ msgID: 4, @@ -549,7 +555,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Greater(t, len(flushUnit[3].field2Path), 0) assert.NotNil(t, flushUnit[3].field2Path) assert.True(t, flushUnit[3].flushed) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 0) + // assert.Equal(t, len(iBNode.insertBuffer.insertData), 0) }) } @@ -723,7 +729,7 @@ func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) { } func TestInsertBufferNode_BufferData(te *testing.T) { - Params.FlushInsertBufferSize = 16 + Params.FlushInsertBufferSize = 16 * (1 << 20) // 16 MB tests := []struct { isValid bool @@ -749,6 +755,9 @@ func TestInsertBufferNode_BufferData(te *testing.T) { assert.Equal(t, test.expectedLimit, idata.limit) assert.Zero(t, idata.size) + + capacity := idata.effectiveCap() + assert.Equal(t, test.expectedLimit, capacity) } else { assert.Error(t, err) assert.Nil(t, idata)