mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix bugs in metrics (#18134)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
6e0a67b3df
commit
08a151d00f
@ -386,7 +386,7 @@ func (it *IndexBuildTask) loadFieldData(ctx context.Context) (storage.FieldID, s
|
||||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return storage.InvalidUniqueID, nil, err
|
||||
}
|
||||
loadVectorDuration := it.tr.RecordSpan()
|
||||
loadVectorDuration := it.tr.RecordSpan().Milliseconds()
|
||||
log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
it.tr.Record("load field data done")
|
||||
metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration))
|
||||
@ -396,7 +396,8 @@ func (it *IndexBuildTask) loadFieldData(ctx context.Context) (storage.FieldID, s
|
||||
if err2 != nil {
|
||||
return storage.InvalidUniqueID, nil, err2
|
||||
}
|
||||
metrics.IndexNodeDecodeFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(it.tr.RecordSpan()))
|
||||
decodeDuration := it.tr.RecordSpan().Milliseconds()
|
||||
metrics.IndexNodeDecodeFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(decodeDuration))
|
||||
|
||||
if len(insertData.Data) != 1 {
|
||||
return storage.InvalidUniqueID, nil, errors.New("we expect only one field in deserialized insert data")
|
||||
@ -452,7 +453,7 @@ func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, erro
|
||||
}
|
||||
}
|
||||
|
||||
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(it.tr.RecordSpan()))
|
||||
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(it.tr.RecordSpan().Milliseconds()))
|
||||
|
||||
it.tr.Record("build index done")
|
||||
}
|
||||
|
||||
@ -6,8 +6,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// RootCoordProxyCounter counts the num of registered proxy nodes
|
||||
RootCoordProxyCounter = prometheus.NewGaugeVec(
|
||||
// RootCoordProxyNum records the num of registered proxy nodes
|
||||
RootCoordProxyNum = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.RootCoordRole,
|
||||
@ -130,7 +130,7 @@ var (
|
||||
|
||||
//RegisterRootCoord registers RootCoord metrics
|
||||
func RegisterRootCoord(registry *prometheus.Registry) {
|
||||
registry.Register(RootCoordProxyCounter)
|
||||
registry.Register(RootCoordProxyNum)
|
||||
|
||||
// for time tick
|
||||
registry.MustRegister(RootCoordInsertChannelTimeTick)
|
||||
|
||||
@ -298,7 +298,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
||||
var rowIDEnd UniqueID
|
||||
tr := timerecord.NewTimeRecorder("applyPK")
|
||||
rowIDBegin, rowIDEnd, _ = it.rowIDAllocator.Alloc(rowNums)
|
||||
metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan()))
|
||||
metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
it.RowIDs = make([]UniqueID, rowNums)
|
||||
for i := rowIDBegin; i < rowIDEnd; i++ {
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
@ -98,6 +99,8 @@ type MetaTable struct {
|
||||
segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection id/index_id/partition_id/segment_id -> meta
|
||||
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // collection id/index_id -> meta
|
||||
|
||||
partitionNum int
|
||||
|
||||
ddLock sync.RWMutex
|
||||
credLock sync.RWMutex
|
||||
}
|
||||
@ -125,6 +128,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
||||
mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
|
||||
mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
|
||||
mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
|
||||
mt.partitionNum = 0
|
||||
|
||||
_, values, err := mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
|
||||
if err != nil {
|
||||
@ -155,6 +159,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
||||
}
|
||||
mt.collID2Meta[collInfo.ID] = collInfo
|
||||
mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
|
||||
mt.partitionNum += len(collInfo.PartitionIDs)
|
||||
}
|
||||
|
||||
_, values, err = mt.txn.LoadWithPrefix(SegmentIndexMetaPrefix)
|
||||
@ -210,6 +215,8 @@ func (mt *MetaTable) reloadFromKV() error {
|
||||
mt.indexID2Meta[meta.IndexID] = meta
|
||||
}
|
||||
|
||||
metrics.RootCoordNumOfCollections.Set(float64(len(mt.collID2Meta)))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(mt.partitionNum))
|
||||
log.Debug("reload meta table from KV successfully")
|
||||
return nil
|
||||
}
|
||||
@ -236,6 +243,7 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
|
||||
coll.PartitionCreatedTimestamps[0] = ts
|
||||
}
|
||||
mt.collID2Meta[coll.ID] = *coll
|
||||
metrics.RootCoordNumOfCollections.Set(float64(len(mt.collID2Meta)))
|
||||
mt.collName2ID[coll.Schema.Name] = coll.ID
|
||||
for _, i := range idx {
|
||||
mt.indexID2Meta[i.IndexID] = *i
|
||||
@ -287,6 +295,8 @@ func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time
|
||||
delete(mt.collID2Meta, collID)
|
||||
delete(mt.collName2ID, collMeta.Schema.Name)
|
||||
|
||||
metrics.RootCoordNumOfCollections.Set(float64(len(mt.collID2Meta)))
|
||||
|
||||
// update segID2IndexMeta
|
||||
for partID := range collMeta.PartitionIDs {
|
||||
if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
|
||||
@ -561,6 +571,8 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string
|
||||
coll.PartitionNames = append(coll.PartitionNames, partitionName)
|
||||
coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
|
||||
mt.collID2Meta[collID] = coll
|
||||
mt.partitionNum++
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(mt.partitionNum))
|
||||
|
||||
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
|
||||
v1, err := proto.Marshal(&coll)
|
||||
@ -706,7 +718,8 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
|
||||
collMeta.PartitionNames = pn
|
||||
collMeta.PartitionCreatedTimestamps = pts
|
||||
mt.collID2Meta[collID] = collMeta
|
||||
|
||||
mt.partitionNum--
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(mt.partitionNum))
|
||||
// update segID2IndexMeta and partID2SegID
|
||||
if segIDMap, ok := mt.partID2SegID[partID]; ok {
|
||||
for segID := range segIDMap {
|
||||
|
||||
@ -152,7 +152,7 @@ func (p *proxyManager) handlePutEvent(e *clientv3.Event) error {
|
||||
for _, f := range p.addSessionsFunc {
|
||||
f(session)
|
||||
}
|
||||
metrics.RootCoordProxyCounter.WithLabelValues().Inc()
|
||||
metrics.RootCoordProxyNum.WithLabelValues().Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -165,7 +165,7 @@ func (p *proxyManager) handleDeleteEvent(e *clientv3.Event) error {
|
||||
for _, f := range p.delSessionsFunc {
|
||||
f(session)
|
||||
}
|
||||
metrics.RootCoordProxyCounter.WithLabelValues().Dec()
|
||||
metrics.RootCoordProxyNum.WithLabelValues().Dec()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -230,5 +230,6 @@ func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sess
|
||||
}
|
||||
sess[s.ServerID] = &s
|
||||
}
|
||||
metrics.RootCoordProxyNum.WithLabelValues().Set(float64(len(sess)))
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
@ -1529,7 +1529,6 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfCollections.Inc()
|
||||
return succStatus(), nil
|
||||
}
|
||||
|
||||
@ -1561,7 +1560,6 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfCollections.Dec()
|
||||
return succStatus(), nil
|
||||
}
|
||||
|
||||
@ -1715,7 +1713,6 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
|
||||
return succStatus(), nil
|
||||
}
|
||||
|
||||
@ -1750,7 +1747,6 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
|
||||
return succStatus(), nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user