mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 18:18:30 +08:00
Fix bug for statistical metrics is incorrect (#19357)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com> Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
adeaac4f9a
commit
b648034cee
@ -20,8 +20,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@ -173,6 +176,37 @@ func (mt *metaTable) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc f
|
|||||||
return updateFunc(model.CloneSegmentIndex(segIdx))
|
return updateFunc(model.CloneSegmentIndex(segIdx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mt *metaTable) updateIndexTasksMetrics() {
|
||||||
|
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
|
||||||
|
for _, segIdx := range mt.buildID2SegmentIndex {
|
||||||
|
if segIdx.IsDeleted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := taskMetrics[segIdx.CollectionID]; !ok {
|
||||||
|
taskMetrics[segIdx.CollectionID] = make(map[commonpb.IndexState]int)
|
||||||
|
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Unissued] = 0
|
||||||
|
taskMetrics[segIdx.CollectionID][commonpb.IndexState_InProgress] = 0
|
||||||
|
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Finished] = 0
|
||||||
|
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Failed] = 0
|
||||||
|
}
|
||||||
|
taskMetrics[segIdx.CollectionID][segIdx.IndexState]++
|
||||||
|
}
|
||||||
|
for collID, m := range taskMetrics {
|
||||||
|
for k, v := range m {
|
||||||
|
switch k {
|
||||||
|
case commonpb.IndexState_Unissued:
|
||||||
|
metrics.IndexCoordIndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.UnissuedIndexTaskLabel).Set(float64(v))
|
||||||
|
case commonpb.IndexState_InProgress:
|
||||||
|
metrics.IndexCoordIndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.InProgressIndexTaskLabel).Set(float64(v))
|
||||||
|
case commonpb.IndexState_Finished:
|
||||||
|
metrics.IndexCoordIndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FinishedIndexTaskLabel).Set(float64(v))
|
||||||
|
case commonpb.IndexState_Failed:
|
||||||
|
metrics.IndexCoordIndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FailedIndexTaskLabel).Set(float64(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (mt *metaTable) GetAllIndexMeta() map[int64]*model.SegmentIndex {
|
func (mt *metaTable) GetAllIndexMeta() map[int64]*model.SegmentIndex {
|
||||||
mt.segmentIndexLock.RLock()
|
mt.segmentIndexLock.RLock()
|
||||||
defer mt.segmentIndexLock.RUnlock()
|
defer mt.segmentIndexLock.RUnlock()
|
||||||
@ -273,7 +307,7 @@ func (mt *metaTable) AddIndex(segIndex *model.SegmentIndex) error {
|
|||||||
}
|
}
|
||||||
segIndex.IndexState = commonpb.IndexState_Unissued
|
segIndex.IndexState = commonpb.IndexState_Unissued
|
||||||
|
|
||||||
metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Inc()
|
metrics.IndexCoordIndexTaskNum.WithLabelValues(strconv.FormatInt(segIndex.CollectionID, 10), metrics.UnissuedIndexTaskLabel).Inc()
|
||||||
if err := mt.saveSegmentIndexMeta(segIndex); err != nil {
|
if err := mt.saveSegmentIndexMeta(segIndex); err != nil {
|
||||||
// no need to reload, no reason to compare version fail
|
// no need to reload, no reason to compare version fail
|
||||||
log.Error("IndexCoord metaTable save index meta failed", zap.Int64("buildID", buildID),
|
log.Error("IndexCoord metaTable save index meta failed", zap.Int64("buildID", buildID),
|
||||||
@ -372,11 +406,14 @@ func (mt *metaTable) BuildIndex(buildID UniqueID) error {
|
|||||||
log.Error("IndexCoord metaTable BuildIndex fail", zap.Int64("buildID", segIdx.BuildID), zap.Error(err))
|
log.Error("IndexCoord metaTable BuildIndex fail", zap.Int64("buildID", segIdx.BuildID), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Dec()
|
|
||||||
metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.InProgressIndexTaskLabel).Inc()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return mt.updateSegIndexMeta(segIdx, updateFunc)
|
if err := mt.updateSegIndexMeta(segIdx, updateFunc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mt.updateIndexTasksMetrics()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetIndexesForCollection gets all indexes info with the specified collection.
|
// GetIndexesForCollection gets all indexes info with the specified collection.
|
||||||
@ -905,6 +942,13 @@ func (mt *metaTable) RemoveIndex(collID, indexID UniqueID) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
delete(mt.collectionIndexes[collID], indexID)
|
delete(mt.collectionIndexes[collID], indexID)
|
||||||
|
if len(mt.collectionIndexes[collID]) == 0 {
|
||||||
|
delete(mt.collectionIndexes, collID)
|
||||||
|
metrics.IndexCoordIndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.UnissuedIndexTaskLabel})
|
||||||
|
metrics.IndexCoordIndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.InProgressIndexTaskLabel})
|
||||||
|
metrics.IndexCoordIndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FinishedIndexTaskLabel})
|
||||||
|
metrics.IndexCoordIndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FailedIndexTaskLabel})
|
||||||
|
}
|
||||||
log.Info("IndexCoord meta table remove index success", zap.Int64("collID", collID), zap.Int64("indexID", indexID))
|
log.Info("IndexCoord meta table remove index success", zap.Int64("collID", collID), zap.Int64("indexID", indexID))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -923,8 +967,12 @@ func (mt *metaTable) RemoveSegmentIndex(collID, partID, segID, buildID UniqueID)
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(mt.segmentIndexes[segIdx.SegmentID], segIdx.IndexID)
|
delete(mt.segmentIndexes[segID], segIdx.IndexID)
|
||||||
delete(mt.buildID2SegmentIndex, buildID)
|
delete(mt.buildID2SegmentIndex, buildID)
|
||||||
|
if len(mt.segmentIndexes[segID]) == 0 {
|
||||||
|
delete(mt.segmentIndexes, segID)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -968,7 +1016,11 @@ func (mt *metaTable) ResetMeta(buildID UniqueID) error {
|
|||||||
return mt.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
return mt.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
||||||
}
|
}
|
||||||
|
|
||||||
return mt.updateSegIndexMeta(segIdx, updateFunc)
|
if err := mt.updateSegIndexMeta(segIdx, updateFunc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mt.updateIndexTasksMetrics()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
|
func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
|
||||||
@ -987,7 +1039,12 @@ func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
|
|||||||
return mt.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
return mt.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
||||||
}
|
}
|
||||||
|
|
||||||
return mt.updateSegIndexMeta(segIdx, updateFunc)
|
if err := mt.updateSegIndexMeta(segIdx, updateFunc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mt.updateIndexTasksMetrics()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mt *metaTable) MarkSegmentsIndexAsDeletedByBuildID(buildIDs []UniqueID) error {
|
func (mt *metaTable) MarkSegmentsIndexAsDeletedByBuildID(buildIDs []UniqueID) error {
|
||||||
|
|||||||
@ -1064,13 +1064,49 @@ func TestMetaTable_ResetNodeID(t *testing.T) {
|
|||||||
|
|
||||||
func TestMetaTable_ResetMeta(t *testing.T) {
|
func TestMetaTable_ResetMeta(t *testing.T) {
|
||||||
t.Run("success", func(t *testing.T) {
|
t.Run("success", func(t *testing.T) {
|
||||||
mt := constructMetaTable(&indexcoord.Catalog{
|
mt := &metaTable{
|
||||||
Txn: &mockETCDKV{
|
catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()},
|
||||||
multiSave: func(m map[string]string) error {
|
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
|
||||||
return nil
|
buildID: {
|
||||||
|
SegmentID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
NumRows: 1024,
|
||||||
|
IndexID: indexID,
|
||||||
|
BuildID: buildID,
|
||||||
|
NodeID: 1,
|
||||||
|
IndexVersion: 1,
|
||||||
|
IndexState: commonpb.IndexState_InProgress,
|
||||||
|
FailReason: "",
|
||||||
|
IsDeleted: false,
|
||||||
|
CreateTime: 1,
|
||||||
|
IndexFilePaths: nil,
|
||||||
|
IndexSize: 0,
|
||||||
|
WriteHandoff: false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
|
||||||
|
segID: {
|
||||||
|
indexID: {
|
||||||
|
SegmentID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
NumRows: 1024,
|
||||||
|
IndexID: indexID,
|
||||||
|
BuildID: buildID,
|
||||||
|
NodeID: 1,
|
||||||
|
IndexVersion: 1,
|
||||||
|
IndexState: commonpb.IndexState_InProgress,
|
||||||
|
FailReason: "",
|
||||||
|
IsDeleted: false,
|
||||||
|
CreateTime: 1,
|
||||||
|
IndexFilePaths: nil,
|
||||||
|
IndexSize: 0,
|
||||||
|
WriteHandoff: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
err := mt.ResetMeta(buildID)
|
err := mt.ResetMeta(buildID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(0), mt.buildID2SegmentIndex[buildID].NodeID)
|
assert.Equal(t, int64(0), mt.buildID2SegmentIndex[buildID].NodeID)
|
||||||
@ -1138,6 +1174,32 @@ func TestMetaTable_FinishTask(t *testing.T) {
|
|||||||
assert.ElementsMatch(t, []string{"file3", "file4"}, mt.buildID2SegmentIndex[buildID+1].IndexFilePaths)
|
assert.ElementsMatch(t, []string{"file3", "file4"}, mt.buildID2SegmentIndex[buildID+1].IndexFilePaths)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("state failed", func(t *testing.T) {
|
||||||
|
mt := constructMetaTable(&indexcoord.Catalog{
|
||||||
|
Txn: &mockETCDKV{
|
||||||
|
save: func(s string, s2 string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
multiSave: func(m map[string]string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
err := mt.AddIndex(segIdx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = mt.FinishTask(&indexpb.IndexTaskInfo{
|
||||||
|
BuildID: buildID + 1,
|
||||||
|
State: commonpb.IndexState_Failed,
|
||||||
|
IndexFiles: []string{},
|
||||||
|
SerializedSize: 0,
|
||||||
|
FailReason: "failed",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, commonpb.IndexState_Failed, mt.buildID2SegmentIndex[buildID+1].IndexState)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("fail", func(t *testing.T) {
|
t.Run("fail", func(t *testing.T) {
|
||||||
mt := constructMetaTable(&indexcoord.Catalog{
|
mt := constructMetaTable(&indexcoord.Catalog{
|
||||||
Txn: &mockETCDKV{
|
Txn: &mockETCDKV{
|
||||||
|
|||||||
@ -27,25 +27,25 @@ var (
|
|||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.IndexCoordRole,
|
Subsystem: typeutil.IndexCoordRole,
|
||||||
Name: "indexreq_count",
|
Name: "index_req_count",
|
||||||
Help: "number of building index requests ",
|
Help: "number of building index requests ",
|
||||||
}, []string{statusLabelName})
|
}, []string{statusLabelName})
|
||||||
|
|
||||||
// IndexCoordIndexTaskCounter records the number of index tasks of each type.
|
// IndexCoordIndexTaskNum records the number of index tasks of each type.
|
||||||
IndexCoordIndexTaskCounter = prometheus.NewGaugeVec(
|
IndexCoordIndexTaskNum = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.IndexCoordRole,
|
Subsystem: typeutil.IndexCoordRole,
|
||||||
Name: "indextask_count",
|
Name: "index_task_count",
|
||||||
Help: "number of index tasks of each type",
|
Help: "number of index tasks of each type",
|
||||||
}, []string{indexTaskStatusLabelName})
|
}, []string{collectionIDLabelName, indexTaskStatusLabelName})
|
||||||
|
|
||||||
// IndexCoordIndexNodeNum records the number of IndexNodes managed by IndexCoord.
|
// IndexCoordIndexNodeNum records the number of IndexNodes managed by IndexCoord.
|
||||||
IndexCoordIndexNodeNum = prometheus.NewGaugeVec(
|
IndexCoordIndexNodeNum = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.IndexCoordRole,
|
Subsystem: typeutil.IndexCoordRole,
|
||||||
Name: "indexnode_num",
|
Name: "index_node_num",
|
||||||
Help: "number of IndexNodes managed by IndexCoord",
|
Help: "number of IndexNodes managed by IndexCoord",
|
||||||
}, []string{})
|
}, []string{})
|
||||||
)
|
)
|
||||||
@ -53,6 +53,6 @@ var (
|
|||||||
//RegisterIndexCoord registers IndexCoord metrics
|
//RegisterIndexCoord registers IndexCoord metrics
|
||||||
func RegisterIndexCoord(registry *prometheus.Registry) {
|
func RegisterIndexCoord(registry *prometheus.Registry) {
|
||||||
registry.MustRegister(IndexCoordIndexRequestCounter)
|
registry.MustRegister(IndexCoordIndexRequestCounter)
|
||||||
registry.MustRegister(IndexCoordIndexTaskCounter)
|
registry.MustRegister(IndexCoordIndexTaskNum)
|
||||||
registry.MustRegister(IndexCoordIndexNodeNum)
|
registry.MustRegister(IndexCoordIndexNodeNum)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user