fix: Fix datacoord metrics (#38163)

issue: https://github.com/milvus-io/milvus/issues/38162

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-12-03 14:10:45 +08:00 committed by GitHub
parent e09f431891
commit 9e008685ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 22 deletions

View File

@ -883,7 +883,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle
coll, ok := collections[segmentIdx.CollectionID]
if ok {
metrics.DataCoordStoredIndexFilesSize.WithLabelValues(coll.DatabaseName, coll.Schema.GetName(),
fmt.Sprint(segmentIdx.CollectionID)).Set(float64(segmentIdx.IndexSize))
fmt.Sprint(segmentIdx.CollectionID)).Add(float64(segmentIdx.IndexSize))
total += segmentIdx.IndexSize
}
}

View File

@ -396,6 +396,22 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return m.getNumRowsOfCollectionUnsafe(collectionID)
}
func getBinlogFileCount(s *datapb.SegmentInfo) int {
statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int {
cnt := 0
for _, fbs := range fieldBinlogs {
cnt += len(fbs.Binlogs)
}
return cnt
}
cnt := 0
cnt += statsFieldFn(s.GetBinlogs())
cnt += statsFieldFn(s.GetStatslogs())
cnt += statsFieldFn(s.GetDeltalogs())
return cnt
}
func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
info := &metricsinfo.DataCoordQuotaMetrics{}
m.RLock()
@ -409,6 +425,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
segments := m.segments.GetSegments()
var total int64
metrics.DataCoordStoredBinlogSize.Reset()
metrics.DataCoordSegmentBinLogFileCount.Reset()
for _, segment := range segments {
segmentSize := segment.getSegmentSize()
if isSegmentHealthy(segment) && !segment.GetIsImporting() {
@ -425,7 +442,9 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
coll, ok := m.collections[segment.GetCollectionID()]
if ok {
metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName,
fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Set(float64(segmentSize))
fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Add(float64(segmentSize))
metrics.DataCoordSegmentBinLogFileCount.WithLabelValues(
fmt.Sprint(segment.GetCollectionID())).Add(float64(getBinlogFileCount(segment.SegmentInfo)))
} else {
log.Warn("not found database name", zap.Int64("collectionID", segment.GetCollectionID()))
}

View File

@ -41,7 +41,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -292,8 +291,6 @@ func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.Segment
}
kvs := make(map[string]string)
for _, segment := range segments {
kc.collectMetrics(segment)
// we don't persist binlog fields, but instead store binlogs as independent kvs
cloned := proto.Clone(segment).(*datapb.SegmentInfo)
resetBinlogFields(cloned)
@ -361,23 +358,6 @@ func (kc *Catalog) SaveByBatch(kvs map[string]string) error {
return nil
}
func (kc *Catalog) collectMetrics(s *datapb.SegmentInfo) {
statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int {
cnt := 0
for _, fbs := range fieldBinlogs {
cnt += len(fbs.Binlogs)
}
return cnt
}
cnt := 0
cnt += statsFieldFn(s.GetBinlogs())
cnt += statsFieldFn(s.GetStatslogs())
cnt += statsFieldFn(s.GetDeltalogs())
metrics.DataCoordSegmentBinLogFileCount.WithLabelValues(fmt.Sprint(s.CollectionID)).Set(float64(cnt))
}
func (kc *Catalog) hasBinlogPrefix(segment *datapb.SegmentInfo) (bool, error) {
collectionID, partitionID, segmentID := segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()
prefix := buildFieldBinlogPathPrefix(collectionID, partitionID, segmentID)