diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ff30422ae9..147e59a3d6 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -161,6 +161,11 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { var alloc allocatorInterface = newAllocator(node.masterService) + log.Debug("Received Vchannel Info", + zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())), + zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())), + ) + flushChan := make(chan *flushMsg, 100) dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService) node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 0317f40be7..8d059db304 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -164,9 +164,20 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) { for _, us := range vchanInfo.GetUnflushedSegments() { if us.CollectionID != dsService.collectionID || us.GetInsertChannel() != vchanInfo.ChannelName { + log.Warn("Collection ID or ChannelName not compact", + zap.Int64("Wanted ID", dsService.collectionID), + zap.Int64("Actual ID", us.CollectionID), + zap.String("Wanted Channel Name", vchanInfo.ChannelName), + zap.String("Actual Channel Name", us.GetInsertChannel()), + ) continue } + log.Info("Recover Segment NumOfRows form checkpoints", + zap.String("InsertChannel", us.GetInsertChannel()), + zap.Int64("SegmentID", us.GetID()), + zap.Int64("NumOfRows", us.GetNumOfRows()), + ) dsService.replica.addSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel()) dsService.replica.updateStatistics(us.GetID(), us.GetNumOfRows()) } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 97320ea29d..28159f6b5a 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -86,9 +86,8 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.Uint64("Message endts", msg.EndTs()), zap.Uint64("FilterThreshold", FilterThreshold), ) - resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) - if resMsg != nil { - iMsg.insertMessages = append(iMsg.insertMessages, resMsg) + if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) { + continue } } @@ -104,21 +103,21 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } -func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) *msgstream.InsertMsg { +func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) bool { if ddn.isFlushed(msg.GetSegmentID()) { - return nil + return true } ddn.mu.Lock() if si, ok := ddn.seg2SegInfo[msg.GetSegmentID()]; ok { if msg.EndTs() > si.GetDmlPosition().GetTimestamp() { delete(ddn.seg2SegInfo, msg.GetSegmentID()) - return nil + return true } } ddn.mu.Unlock() - return msg + return false } func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 2a6b25406c..089634a232 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -152,7 +152,10 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { collID := msg.GetCollectionID() partitionID := msg.GetPartitionID() - log.Debug("InsertBufferNode Operating Segment", zap.Int64("ID", currentSegID)) + // log.Debug("InsertBufferNode Operating Segment", + // zap.Int64("ID", currentSegID), + // zap.Int("NumOfRows", len(msg.RowIDs)), + // ) if !ibNode.replica.hasSegment(currentSegID) { err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())