diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index cae51b3562..ff0692604c 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -145,12 +145,14 @@ func (t *compactionTask) getPlanTargetEntryNumber() int64 { // prevent bloom filter too small if result == 0 { + log.Warn("compaction target entry number zero", zap.Int64("planID", t.getPlanID())) return int64(bloomFilterSize) } return result } func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) { + log := log.With(zap.Int64("planID", t.getPlanID())) mergeStart := time.Now() dCodec := storage.NewDeleteCodec() @@ -196,7 +198,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT } dbuff.updateSize(dbuff.delData.RowCount) - log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()), + log.Debug("mergeDeltalogs end", zap.Int("number of pks to compact in insert logs", len(pk2ts)), zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) @@ -209,6 +211,7 @@ func nano2Milli(nano time.Duration) float64 { } func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, []byte, int64, error) { + log := log.With(zap.Int64("planID", t.getPlanID())) mergeStart := time.Now() var ( @@ -352,7 +355,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam return nil, nil, 0, err } - log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), + log.Debug("merge end", zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) return iDatas, segStats, numRows, nil