diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 0b1fe238fe..8784c912d6 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -130,7 +130,7 @@ func (t *compactionTask) getChannelName() string { } func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) { - + mergeStart := time.Now() dCodec := storage.NewDeleteCodec() var ( @@ -176,12 +176,19 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT dbuff.updateSize(dbuff.delData.RowCount) log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()), - zap.Int("number of pks to compact in insert logs", len(pk2ts))) + zap.Int("number of pks to compact in insert logs", len(pk2ts)), + zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart)))) return pk2ts, dbuff, nil } +// nano2Milli transfers nanoseconds to milliseconds in unit +func nano2Milli(nano time.Duration) float64 { + return float64(nano) / float64(time.Millisecond) +} + func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) { + mergeStart := time.Now() var ( dim int // dimension of vector field @@ -284,11 +291,14 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } - log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired)) + log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), + zap.Int64("expired entities", expired), + zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart)))) return iDatas, numRows, nil } func (t *compactionTask) compact() error { + compactStart := time.Now() if ok := funcutil.CheckCtxValid(t.ctx); !ok { log.Error("compact wrong, task context done or timeout") return errContext @@ -333,6 +343,7 @@ func (t *compactionTask) compact() error { } // Inject to stop flush + injectStart := time.Now() ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) { pack.segmentID = targetSegID }) @@ -340,6 +351,7 @@ func (t *compactionTask) compact() error { t.injectFlush(ti, segIDs...) <-ti.Injected() + injectEnd := time.Now() var ( iItr = make([]iterator, 0) @@ -360,6 +372,7 @@ func (t *compactionTask) compact() error { } } + downloadStart := time.Now() g, gCtx := errgroup.WithContext(ctxTimeout) for _, s := range t.plan.GetSegmentBinlogs() { @@ -418,6 +431,7 @@ func (t *compactionTask) compact() error { log.Error("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err } + downloadEnd := time.Now() mergeItr := storage.NewMergeIterator(iItr) @@ -432,11 +446,13 @@ func (t *compactionTask) compact() error { return err } + uploadStart := time.Now() cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err } + uploadEnd := time.Now() for _, fbl := range cpaths.deltaInfo { for _, deltaLogInfo := range fbl.GetBinlogs() { @@ -456,6 +472,7 @@ func (t *compactionTask) compact() error { NumOfRows: numRows, } + rpcStart := time.Now() status, err := t.dc.CompleteCompaction(ctxTimeout, pack) if err != nil { log.Error("complete compaction rpc wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) @@ -465,6 +482,7 @@ func (t *compactionTask) compact() error { log.Error("complete compaction wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.String("reason", status.GetReason())) return fmt.Errorf("complete comapction wrong: %s", status.GetReason()) } + rpcEnd := time.Now() // Compaction I: update pk range. // Compaction II: remove the segments and add a new flushed segment with pk range. @@ -476,10 +494,17 @@ func (t *compactionTask) compact() error { } ti.injectDone(true) - log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()), + log.Info("compaction done", + zap.Int64("planID", t.plan.GetPlanID()), zap.Int("num of binlog paths", len(cpaths.inPaths)), zap.Int("num of stats paths", len(cpaths.statsPaths)), zap.Int("num of delta paths", len(cpaths.deltaInfo)), + zap.Any("inject elapse in ms", nano2Milli(injectEnd.Sub(injectStart))), + zap.Any("download IO elapse in ms", nano2Milli(downloadEnd.Sub(downloadStart))), + zap.Any("upload IO elapse in ms", nano2Milli(uploadEnd.Sub(uploadStart))), + zap.Any("complete compaction rpc elapse in ms", nano2Milli(rpcEnd.Sub(rpcStart))), + zap.Any("injectDone elapse in ms", nano2Milli(time.Since(rpcEnd))), + zap.Any("total elapse in ms", nano2Milli(time.Since(compactStart))), ) return nil }