diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7f66f9464d..8bd50cc640 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -489,6 +489,13 @@ common: ImportMaxFileSize: 17179869184 # 16 * 1024 * 1024 * 1024 # max file size to import for bulkInsert + locks: + metrics: + enable: false + threshold: + info: 500 # minimum milliseconds for printing durations in info level + warn: 1000 # minimum milliseconds for printing durations in warn level + # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: # 1. TT protection; diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 65a195c6e0..015e7c25e1 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -84,6 +84,10 @@ const ( requestScope = "scope" fullMethodLabelName = "full_method" reduceLevelName = "reduce_level" + lockName = "lock_name" + lockSource = "lock_source" + lockType = "lock_type" + lockOp = "lock_op" ) var ( @@ -97,9 +101,22 @@ var ( Name: "num_node", Help: "number of nodes and coordinates", }, []string{nodeIDLabelName, roleNameLabelName}) + + LockCosts = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Name: "lock_time_cost", + Help: "time cost for various kinds of locks", + }, []string{ + lockName, + lockSource, + lockType, + lockOp, + }) ) // Register serves prometheus http service func Register(r *prometheus.Registry) { r.MustRegister(NumNodes) + r.MustRegister(LockCosts) } diff --git a/pkg/util/lock/metric_mutex.go b/pkg/util/lock/metric_mutex.go new file mode 100644 index 0000000000..32c0a5117f --- /dev/null +++ b/pkg/util/lock/metric_mutex.go @@ -0,0 +1,102 @@ +package lock + +import ( + "sync" + "time" + + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "go.uber.org/zap" +) + +type MetricsLockManager struct { + rwLocks map[string]*MetricsRWMutex +} + +type MetricsRWMutex struct { + mutex sync.RWMutex + lockName string + acquireTimeMap map[string]time.Time +} + +const ( + readLock = "READ_LOCK" + writeLock = "WRITE_LOCK" + hold = "HOLD" + acquire = "ACQUIRE" +) + +func (mRWLock *MetricsRWMutex) RLock(source string) { + if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() { + before := time.Now() + mRWLock.mutex.RLock() + mRWLock.acquireTimeMap[source] = time.Now() + logLock(time.Since(before), mRWLock.lockName, source, readLock, acquire) + } else { + mRWLock.mutex.RLock() + } +} + +func (mRWLock *MetricsRWMutex) Lock(source string) { + if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() { + before := time.Now() + mRWLock.mutex.Lock() + mRWLock.acquireTimeMap[source] = time.Now() + logLock(time.Since(before), mRWLock.lockName, source, writeLock, acquire) + } else { + mRWLock.mutex.Lock() + } +} + +func (mRWLock *MetricsRWMutex) UnLock(source string) { + if mRWLock.maybeLogUnlockDuration(source, writeLock) != nil { + return + } + mRWLock.mutex.Unlock() +} + +func (mRWLock *MetricsRWMutex) RUnLock(source string) { + if mRWLock.maybeLogUnlockDuration(source, readLock) != nil { + return + } + mRWLock.mutex.RUnlock() +} + +func (mRWLock *MetricsRWMutex) maybeLogUnlockDuration(source string, lockType string) error { + if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() { + acquireTime, ok := mRWLock.acquireTimeMap[source] + if ok { + logLock(time.Since(acquireTime), mRWLock.lockName, source, lockType, hold) + delete(mRWLock.acquireTimeMap, source) + } else { + log.Error("there's no lock history for the source, there may be some defects in codes", + zap.String("source", source)) + return errors.New("unknown source") + } + } + return nil +} + +func logLock(duration time.Duration, lockName string, source string, lockType string, opType string) { + if duration >= paramtable.Get().CommonCfg.LockSlowLogWarnThreshold.GetAsDuration(time.Millisecond) { + log.Warn("lock takes too long", zap.String("lockName", lockName), zap.String("lockType", lockType), + zap.String("source", source), zap.String("opType", opType), + zap.Duration("time_cost", duration)) + } else if duration >= paramtable.Get().CommonCfg.LockSlowLogInfoThreshold.GetAsDuration(time.Millisecond) { + log.Info("lock takes too long", zap.String("lockName", lockName), zap.String("lockType", lockType), + zap.String("source", source), zap.String("opType", opType), + zap.Duration("time_cost", duration)) + } + metrics.LockCosts.WithLabelValues(lockName, source, lockType, opType).Set(float64(duration.Milliseconds())) +} + +// currently, we keep metricsLockManager as a communal gate for metrics lock +// we may use this manager as a centralized statistical site to provide overall cost for locks +func (mlManager *MetricsLockManager) applyRWLock(name string) *MetricsRWMutex { + return &MetricsRWMutex{ + lockName: name, + acquireTimeMap: make(map[string]time.Time, 0), + } +} diff --git a/pkg/util/lock/metrics_mutex_test.go b/pkg/util/lock/metrics_mutex_test.go new file mode 100644 index 0000000000..f1d691bae3 --- /dev/null +++ b/pkg/util/lock/metrics_mutex_test.go @@ -0,0 +1,69 @@ +package lock + +import ( + "sync" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/stretchr/testify/assert" +) + +func TestMetricsLockLock(t *testing.T) { + lManager := &MetricsLockManager{ + rwLocks: make(map[string]*MetricsRWMutex, 0), + } + params.Params.Init() + params.Params.Save(params.Params.CommonCfg.EnableLockMetrics.Key, "true") + params.Params.Save(params.Params.CommonCfg.LockSlowLogInfoThreshold.Key, "10") + lName := "testLock" + lockDuration := 10 * time.Millisecond + + testRWLock := lManager.applyRWLock(lName) + wg := sync.WaitGroup{} + testRWLock.Lock("main_thread") + go func() { + wg.Add(1) + before := time.Now() + testRWLock.Lock("sub_thread") + lkDuration := time.Since(before) + assert.True(t, lkDuration >= lockDuration) + testRWLock.UnLock("sub_threadXX") + testRWLock.UnLock("sub_thread") + wg.Done() + }() + time.Sleep(lockDuration) + testRWLock.UnLock("main_thread") + wg.Wait() +} + +func TestMetricsLockRLock(t *testing.T) { + lManager := &MetricsLockManager{ + rwLocks: make(map[string]*MetricsRWMutex, 0), + } + params.Params.Init() + params.Params.Save(params.Params.CommonCfg.EnableLockMetrics.Key, "true") + params.Params.Save(params.Params.CommonCfg.LockSlowLogWarnThreshold.Key, "10") + lName := "testLock" + lockDuration := 10 * time.Millisecond + + testRWLock := lManager.applyRWLock(lName) + wg := sync.WaitGroup{} + testRWLock.RLock("main_thread") + go func() { + wg.Add(1) + before := time.Now() + testRWLock.Lock("sub_thread") + lkDuration := time.Since(before) + assert.True(t, lkDuration >= lockDuration) + testRWLock.UnLock("sub_thread") + wg.Done() + }() + time.Sleep(lockDuration) + assert.Equal(t, 1, len(testRWLock.acquireTimeMap)) + testRWLock.RUnLock("main_threadXXX") + assert.Equal(t, 1, len(testRWLock.acquireTimeMap)) + testRWLock.RUnLock("main_thread") + wg.Wait() + assert.Equal(t, 0, len(testRWLock.acquireTimeMap)) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 7fd33e73da..bfb1548cd5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -228,6 +228,11 @@ type commonConfig struct { ImportMaxFileSize ParamItem `refreshable:"true"` MetricsPort ParamItem `refreshable:"false"` + + //lock related params + EnableLockMetrics ParamItem `refreshable:"false"` + LockSlowLogInfoThreshold ParamItem `refreshable:"true"` + LockSlowLogWarnThreshold ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -679,6 +684,33 @@ like the old password verification when updating the credential`, DefaultValue: "9091", } p.MetricsPort.Init(base.mgr) + + p.EnableLockMetrics = ParamItem{ + Key: "common.locks.metrics.enable", + Version: "2.0.0", + DefaultValue: "false", + Doc: "whether gather statistics for metrics locks", + Export: true, + } + p.EnableLockMetrics.Init(base.mgr) + + p.LockSlowLogInfoThreshold = ParamItem{ + Key: "common.locks.threshold.info", + Version: "2.0.0", + DefaultValue: "500", + Doc: "minimum milliseconds for printing durations in info level", + Export: true, + } + p.LockSlowLogInfoThreshold.Init(base.mgr) + + p.LockSlowLogWarnThreshold = ParamItem{ + Key: "common.locks.threshold.warn", + Version: "2.0.0", + DefaultValue: "1000", + Doc: "minimum milliseconds for printing durations in warn level", + Export: true, + } + p.LockSlowLogWarnThreshold.Init(base.mgr) } type traceConfig struct {