diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 6bb89fd102..17f262c1c0 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -448,7 +448,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb } func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition) error { - log.Debug("from dmlcp load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID)) + log.Debug("from dml check point load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID)) stream, err := loader.factory.NewMsgStream(ctx) if err != nil { return err @@ -472,7 +472,6 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection if tsMsg == nil { break } - log.Debug("receive msg", zap.Any("type", tsMsg.Type())) if tsMsg.Type() == commonpb.MsgType_Delete { dmsg := tsMsg.(*msgstream.DeleteMsg) @@ -499,6 +498,8 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection go deletePk(loader.historicalReplica, delData, segmentID, &wg) } wg.Wait() + stream.Close() + log.Debug("from dml check point load done") return nil }