diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index f917643918..660537ce62 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -194,6 +194,8 @@ func (loader *segmentLoaderV2) Load(ctx context.Context, segmentID := loadInfo.SegmentID segment, _ := newSegments.Get(segmentID) + metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc() + defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec() tr := timerecord.NewTimeRecorder("loadDurationPerSegment") var err error @@ -216,7 +218,7 @@ func (loader *segmentLoaderV2) Load(ctx context.Context, log.Info("load segment done", zap.Int64("segmentID", segmentID)) loader.notifyLoadFinish(loadInfo) - metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds()) + metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) return nil } @@ -415,9 +417,11 @@ func (loader *segmentLoaderV2) loadSegment(ctx context.Context, if err != nil { return err } + tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex") if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { return err } + metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) if err := loader.loadSealedSegmentFields(ctx, segment, fieldsMap, loadInfo.GetNumOfRows()); err != nil { return err @@ -632,6 +636,9 @@ func (loader *segmentLoader) Load(ctx context.Context, segmentID := loadInfo.SegmentID segment, _ := newSegments.Get(segmentID) + metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc() + defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec() + tr := timerecord.NewTimeRecorder("loadDurationPerSegment") var err error @@ -654,7 +661,7 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("load segment done", zap.Int64("segmentID", segmentID)) loader.notifyLoadFinish(loadInfo) - metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds()) + metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) return nil } @@ -663,6 +670,7 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("start to load segments in parallel", zap.Int("segmentNum", len(infos)), zap.Int("concurrencyLevel", concurrencyLevel)) + err = funcutil.ProcessFuncParallel(len(infos), concurrencyLevel, loadSegmentFunc, "loadSegmentFunc") if err != nil { @@ -970,12 +978,15 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, return err } + tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex") log.Info("load fields...", zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), ) if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { return err } + metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) + for fieldID, info := range indexedFieldInfos { field, err := schemaHelper.GetFieldFromID(fieldID) if err != nil { @@ -1508,7 +1519,11 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen defer loader.freeRequest(resource) log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos))) + metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Inc() + defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Dec() + tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex") + defer metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) for _, loadInfo := range infos { fieldIDs := typeutil.NewSet(lo.Map(loadInfo.GetIndexInfos(), func(info *querypb.FieldIndexInfo, _ int) int64 { return info.GetFieldID() })...) fieldInfos := lo.SliceToMap(lo.Filter(loadInfo.GetBinlogPaths(), func(info *datapb.FieldBinlog, _ int) bool { return fieldIDs.Contain(info.GetFieldID()) }), diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 11c876a5c4..637a43e750 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -96,6 +96,7 @@ const ( lockSource = "lock_source" lockType = "lock_type" lockOp = "lock_op" + loadTypeName = "load_type" ) var ( diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 570242651c..d9a981e261 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -235,7 +235,7 @@ var ( Subsystem: typeutil.QueryNodeRole, Name: "load_segment_latency", Help: "latency of load per segment", - Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 50, 100, 300, 600, 1200}, // unit seconds + Buckets: longTaskBuckets, // unit milliseconds }, []string{ nodeIDLabelName, }) @@ -479,6 +479,28 @@ var ( Name: "stopping_balance_segment_num", Help: "the number of segment which executing stopping balance", }, []string{nodeIDLabelName}) + + QueryNodeLoadSegmentConcurrency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "load_segment_concurrency", + Help: "number of concurrent loading segments in QueryNode", + }, []string{ + nodeIDLabelName, + loadTypeName, + }) + + QueryNodeLoadIndexLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "load_index_latency", + Help: "latency of load per segment's index, in milliseconds", + Buckets: longTaskBuckets, // unit milliseconds + }, []string{ + nodeIDLabelName, + }) ) // RegisterQueryNode registers QueryNode metrics @@ -524,6 +546,8 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(StoppingBalanceNodeNum) registry.MustRegister(StoppingBalanceChannelNum) registry.MustRegister(StoppingBalanceSegmentNum) + registry.MustRegister(QueryNodeLoadSegmentConcurrency) + registry.MustRegister(QueryNodeLoadIndexLatency) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {