diff --git a/cmd/tools/datameta/main.go b/cmd/tools/datameta/main.go index cb8d5fe6e4..0e684d1479 100644 --- a/cmd/tools/datameta/main.go +++ b/cmd/tools/datameta/main.go @@ -83,7 +83,7 @@ func printSegmentInfo(info *datapb.SegmentInfo) { } if info.DmlPosition != nil { dmlTime, _ := tsoutil.ParseTS(info.DmlPosition.Timestamp) - fmt.Printf("Dml Position ID: %v, time: %s\n", info.StartPosition.MsgID, dmlTime.Format(tsPrintFormat)) + fmt.Printf("Dml Position ID: %v, time: %s\n", info.GetStartPosition().GetMsgID(), dmlTime.Format(tsPrintFormat)) } else { fmt.Println("Dml Position: nil") } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index d87edb0ca0..bf7bab2d1b 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -301,6 +301,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath zap.Int64("segmentID", req.GetSegmentID()), zap.Bool("isFlush", req.GetFlushed()), zap.Bool("isDropped", req.GetDropped()), + zap.Any("startPositions", req.GetStartPositions()), zap.Any("checkpoints", req.GetCheckPoints())) // validate diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 06e6168508..f99851cf14 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -403,9 +403,14 @@ func (t *compactionTask) compact() error { return err } - cpaths.deltaInfo.DeltaLogSize = deltaBuf.size - cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom - cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo + var deltaLogs []*datapb.DeltaLogInfo + if len(cpaths.deltaInfo.GetDeltaLogPath()) > 0 { + cpaths.deltaInfo.DeltaLogSize = deltaBuf.size + cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom + cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo + + deltaLogs = append(deltaLogs, cpaths.deltaInfo) + } pack := &datapb.CompactionResult{ PlanID: t.plan.GetPlanID(), @@ -413,8 +418,7 @@ func (t *compactionTask) compact() error { InsertLogs: cpaths.inPaths, Field2StatslogPaths: cpaths.statsPaths, NumOfRows: numRows, - - Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo}, + Deltalogs: deltaLogs, } status, err := t.dc.CompleteCompaction(ctxTimeout, pack) @@ -446,7 +450,11 @@ func (t *compactionTask) compact() error { } ti.injectOver <- true - log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID())) + log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()), + zap.Any("num of binlog paths", len(cpaths.inPaths)), + zap.Any("num of stats paths", len(cpaths.statsPaths)), + zap.Any("deltalog paths", cpaths.deltaInfo.GetDeltaLogPath()), + ) return nil } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 0765d70c7d..42ee4b5bb2 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -454,9 +454,12 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet Position: pack.pos, }) + startPos := dsService.replica.listNewSegmentsStartPositions() + log.Debug("SaveBinlogPath", zap.Int64("SegmentID", pack.segmentID), zap.Int64("CollectionID", dsService.collectionID), + zap.Any("startPos", startPos), zap.Int("Length of Field2BinlogPaths", len(fieldInsert)), zap.Int("Length of Field2Stats", len(fieldStats)), zap.Int("Length of Field2Deltalogs", len(deltaInfos)), @@ -477,7 +480,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet CheckPoints: checkPoints, - StartPositions: dsService.replica.listNewSegmentsStartPositions(), + StartPositions: startPos, Flushed: pack.flushed, Dropped: pack.dropped, }