From e7e4561da83c8aeed0bbb6ec1894c44595f415d5 Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 11 Jan 2024 10:24:51 +0800 Subject: [PATCH] fix: the entities num metric may be contributed more than once (#29767) (#29825) the growing segments contribute to this metric while inserting and putting into the manager, but the current impl inserts data before putting the segments into manager, which leads to double contributions fix: #29766 pr: #29767 Signed-off-by: yah01 --- .../querynodev2/delegator/delegator_data.go | 7 +++++++ internal/querynodev2/segments/manager.go | 18 ------------------ internal/querynodev2/segments/segment.go | 16 +++++++++------- .../querynodev2/segments/segment_loader.go | 8 ++++++++ 4 files changed, 24 insertions(+), 25 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 50c1c425d8..12f30a8653 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -108,6 +108,13 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { // panic here, insert failure panic(err) } + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(growing.Collection()), + fmt.Sprint(growing.Partition()), + growing.Type().String(), + fmt.Sprint(0), + ).Add(float64(len(insertData.RowIDs))) growing.UpdateBloomFilter(insertData.PrimaryKeys) if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) { diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 50cc5b7551..bd375a7f79 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -195,15 +195,6 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { segment.Type().String(), fmt.Sprint(len(segment.Indexes())), ).Inc() - if segment.InsertCount() > 0 { - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Add(float64(segment.InsertCount())) - } } mgr.updateMetric() @@ -557,14 +548,5 @@ func remove(segment Segment) bool { segment.Type().String(), fmt.Sprint(len(segment.Indexes())), ).Dec() - if segment.InsertCount() > 0 { - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Sub(float64(segment.InsertCount())) - } return true } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index c80fbce04b..2616f203a4 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -536,13 +536,6 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] s.insertCount.Add(int64(numOfRow)) s.rowNum.Store(-1) s.memSize.Store(-1) - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(s.collectionID), - fmt.Sprint(s.partitionID), - s.Type().String(), - fmt.Sprint(0), - ).Add(float64(numOfRow)) return nil } @@ -979,6 +972,15 @@ func (s *LocalSegment) Release() { } C.DeleteSegment(ptr) + + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(s.Collection()), + fmt.Sprint(s.Partition()), + s.Type().String(), + fmt.Sprint(len(s.Indexes())), + ).Sub(float64(s.InsertCount())) + log.Info("delete segment from memory", zap.Int64("collectionID", s.collectionID), zap.Int64("partitionID", s.partitionID), diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 25b7ff553b..db2d3fca28 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -614,6 +614,14 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } } + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + fmt.Sprint(segment.Indexes()), + ).Add(float64(loadInfo.GetNumOfRows())) + log.Info("loading delta...") return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs) }