From d357139064d2ae8b7c8a8b5bcc6470daadd6a39e Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 10 Jan 2024 10:00:51 +0800 Subject: [PATCH] fix: the entities num metric may be contributed more than once (#29767) the growing segments contribute to this metric while inserting and putting into the manager, but the current impl inserts data before putting the segments into manager, which leads to double contributions fix: #29766 Signed-off-by: yah01 --- internal/querynodev2/delegator/delegator_data.go | 7 +++++++ internal/querynodev2/segments/manager.go | 15 --------------- internal/querynodev2/segments/segment.go | 16 +++++++++------- internal/querynodev2/segments/segment_loader.go | 8 ++++++++ 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a6d1238230..4984d4b08c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -121,6 +121,13 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { // panic here, insert failure panic(err) } + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(growing.Collection()), + fmt.Sprint(growing.Partition()), + growing.Type().String(), + fmt.Sprint(0), + ).Add(float64(len(insertData.RowIDs))) growing.UpdateBloomFilter(insertData.PrimaryKeys) if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) { diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 03c77174ff..2fe95f9576 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -202,14 +202,6 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { fmt.Sprint(len(segment.Indexes())), segment.Level().String(), ).Inc() - - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Add(float64(segment.InsertCount())) } mgr.updateMetric() @@ -566,12 +558,5 @@ func remove(segment Segment) bool { segment.Level().String(), ).Dec() - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Sub(float64(segment.InsertCount())) return true } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index e66ee84195..def7229cd4 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -553,13 +553,6 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] s.insertCount.Add(int64(numOfRow)) s.rowNum.Store(-1) s.memSize.Store(-1) - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(s.collectionID), - fmt.Sprint(s.partitionID), - s.Type().String(), - fmt.Sprint(0), - ).Add(float64(numOfRow)) return nil } @@ -991,6 +984,15 @@ func (s *LocalSegment) Release() { } C.DeleteSegment(ptr) + + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(s.Collection()), + fmt.Sprint(s.Partition()), + s.Type().String(), + fmt.Sprint(len(s.Indexes())), + ).Sub(float64(s.InsertCount())) + log.Info("delete segment from memory", zap.Int64("collectionID", s.collectionID), zap.Int64("partitionID", s.partitionID), diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index ea8f2a214f..75c68f303d 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -618,6 +618,14 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } } + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + fmt.Sprint(segment.Indexes()), + ).Add(float64(loadInfo.GetNumOfRows())) + log.Info("loading delta...") return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs) }