fix: the entities num metric may be contributed more than once (#29767) (#29825)

the growing segments contribute to this metric while inserting and
putting into the manager, but the current impl inserts data before
putting the segments into manager, which leads to double contributions

fix: #29766
pr: #29767

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2024-01-11 10:24:51 +08:00 committed by GitHub
parent b580aef4dc
commit e7e4561da8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 25 deletions

View File

@ -108,6 +108,13 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
// panic here, insert failure
panic(err)
}
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(growing.Collection()),
fmt.Sprint(growing.Partition()),
growing.Type().String(),
fmt.Sprint(0),
).Add(float64(len(insertData.RowIDs)))
growing.UpdateBloomFilter(insertData.PrimaryKeys)
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {

View File

@ -195,15 +195,6 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Inc()
if segment.InsertCount() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Add(float64(segment.InsertCount()))
}
}
mgr.updateMetric()
@ -557,14 +548,5 @@ func remove(segment Segment) bool {
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Dec()
if segment.InsertCount() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Sub(float64(segment.InsertCount()))
}
return true
}

View File

@ -536,13 +536,6 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
s.insertCount.Add(int64(numOfRow))
s.rowNum.Store(-1)
s.memSize.Store(-1)
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(s.collectionID),
fmt.Sprint(s.partitionID),
s.Type().String(),
fmt.Sprint(0),
).Add(float64(numOfRow))
return nil
}
@ -979,6 +972,15 @@ func (s *LocalSegment) Release() {
}
C.DeleteSegment(ptr)
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(s.Collection()),
fmt.Sprint(s.Partition()),
s.Type().String(),
fmt.Sprint(len(s.Indexes())),
).Sub(float64(s.InsertCount()))
log.Info("delete segment from memory",
zap.Int64("collectionID", s.collectionID),
zap.Int64("partitionID", s.partitionID),

View File

@ -614,6 +614,14 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
}
}
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(segment.Indexes()),
).Add(float64(loadInfo.GetNumOfRows()))
log.Info("loading delta...")
return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs)
}