From ed4e516ce755c4d1ef5166c0c6be089727b0aa3a Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 19 Oct 2021 14:34:35 +0800 Subject: [PATCH] Save stats and delta logs from DataNode (#10159) Signed-off-by: Congqi Xia --- internal/datanode/data_sync_service.go | 15 +++++++++++---- internal/datanode/flow_graph_delete_node.go | 9 +++++---- internal/datanode/flush_manager.go | 1 + internal/datanode/flush_task.go | 6 +++++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 98dbcf9ca8..670aaa148c 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -159,6 +159,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error { fieldInsert := []*datapb.FieldBinlog{} fieldStats := []*datapb.FieldBinlog{} + deltaInfos := make([]*datapb.DeltaLogInfo, len(pack.deltaLogs)) checkPoints := []*datapb.CheckPoint{} for k, v := range pack.insertLogs { fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}}) @@ -166,6 +167,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro for k, v := range pack.statsLogs { fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}}) } + for _, delData := range pack.deltaLogs { + deltaInfos = append(deltaInfos, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogSize: delData.fileSize}) + } + // only current segment checkpoint info, updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID) checkPoints = append(checkPoints, &datapb.CheckPoint{ @@ -187,10 +192,12 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro Timestamp: 0, //TODO time stamp SourceID: Params.NodeID, }, - SegmentID: pack.segmentID, - CollectionID: dsService.collectionID, - Field2BinlogPaths: fieldInsert, - //TODO WIP add statslog and deltalog + SegmentID: pack.segmentID, + CollectionID: dsService.collectionID, + Field2BinlogPaths: fieldInsert, + Field2StatslogPaths: fieldStats, + Deltalogs: deltaInfos, + CheckPoints: checkPoints, StartPositions: dsService.replica.listNewSegmentsStartPositions(), diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 9b17cebbb7..1ec7252b8e 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -50,10 +50,11 @@ type deleteNode struct { // DelDataBuf buffers insert data, monitoring buffer size and limit // size and limit both indicate numOfRows type DelDataBuf struct { - delData *DeleteData - size int64 - tsFrom Timestamp - tsTo Timestamp + delData *DeleteData + size int64 + tsFrom Timestamp + tsTo Timestamp + fileSize int64 } func (ddb *DelDataBuf) updateSize(size int64) { diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 9e363fa150..284fb47933 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -244,6 +244,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique blobKey, _ := m.genKey(false, collID, partID, segmentID, logID) blobPath := path.Join(Params.DeleteBinlogRootPath, blobKey) kvs := map[string]string{blobPath: string(blob.Value[:])} + data.fileSize = int64(len(blob.Value)) log.Debug("delete blob path", zap.String("path", blobPath)) m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{ diff --git a/internal/datanode/flush_task.go b/internal/datanode/flush_task.go index ba38cce56b..388bc813b5 100644 --- a/internal/datanode/flush_task.go +++ b/internal/datanode/flush_task.go @@ -88,7 +88,11 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslog // runFlushDel execute flush delete task with once and retry func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf) { t.deleteOnce.Do(func() { - t.deltaLogs = []*DelDataBuf{deltaLogs} + if deltaLogs == nil { + t.deltaLogs = []*DelDataBuf{} + } else { + t.deltaLogs = []*DelDataBuf{deltaLogs} + } go func() { err := errStart for err != nil {