enhance: purge client infos periodically (#31037)

https://github.com/milvus-io/milvus/issues/31007

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2024-03-06 12:50:59 +08:00 committed by GitHub
parent 3c9ffdedf3
commit a88c896733
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 5 deletions

View File

@ -210,6 +210,7 @@ proxy:
maxTaskNum: 1024 # max task number of proxy task queue
connectionCheckIntervalSeconds: 120 # the interval time(in seconds) for connection manager to scan inactive client info
connectionClientInfoTTLSeconds: 86400 # inactive client info TTL duration, in seconds
maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos.
accessLog:
enable: false
# Log filename, set as "" to use stdout.

View File

@ -1,12 +1,13 @@
package connection
import (
"container/heap"
"context"
"strconv"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
@ -22,7 +23,6 @@ type connectionManager struct {
wg sync.WaitGroup
clientInfos *typeutil.ConcurrentMap[int64, clientInfo]
count atomic.Int64
}
func (s *connectionManager) init() {
@ -52,11 +52,49 @@ func (s *connectionManager) checkLoop() {
return
case <-t.C:
s.removeLongInactiveClients()
// not sure if we should purge them periodically.
s.purgeIfNumOfClientsExceed()
t.Reset(paramtable.Get().ProxyCfg.ConnectionCheckIntervalSeconds.GetAsDuration(time.Second))
}
}
}
func (s *connectionManager) purgeIfNumOfClientsExceed() {
diffNum := int64(s.clientInfos.Len()) - paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()
if diffNum <= 0 {
return
}
begin := time.Now()
log := log.With(
zap.Int64("num", int64(s.clientInfos.Len())),
zap.Int64("limit", paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()))
log.Info("number of client infos exceed limit, ready to purge the oldest")
q := newPriorityQueueWithCap(int(diffNum + 1))
s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
heap.Push(&q, newQueryItem(info.identifier, info.lastActiveTime))
if int64(q.Len()) > diffNum {
// pop the newest.
heap.Pop(&q)
}
return true
})
// time order doesn't matter here.
for _, item := range q {
info, exist := s.clientInfos.GetAndRemove(item.identifier)
if exist {
log.Info("remove client info", info.GetLogger()...)
}
}
log.Info("purge client infos done",
zap.Duration("cost", time.Since(begin)),
zap.Int64("num after purge", int64(s.clientInfos.Len())))
}
func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
cli := clientInfo{
ClientInfo: info,
@ -64,7 +102,6 @@ func (s *connectionManager) Register(ctx context.Context, identifier int64, info
lastActiveTime: time.Now(),
}
s.count.Inc()
s.clientInfos.Insert(identifier, cli)
log.Ctx(ctx).Info("client register", cli.GetLogger()...)
}
@ -74,7 +111,7 @@ func (s *connectionManager) KeepActive(identifier int64) {
}
func (s *connectionManager) List() []*commonpb.ClientInfo {
clients := make([]*commonpb.ClientInfo, 0, s.count.Load())
clients := make([]*commonpb.ClientInfo, 0, s.clientInfos.Len())
s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
if info.ClientInfo != nil {
@ -120,7 +157,6 @@ func (s *connectionManager) removeLongInactiveClients() {
if time.Since(info.lastActiveTime) > ttl {
log.Info("client deregister", info.GetLogger()...)
s.clientInfos.Remove(candidate)
s.count.Dec()
}
return true
})

View File

@ -44,3 +44,24 @@ func TestConnectionManager(t *testing.T) {
return len(s.List()) == 0
}, time.Second*5, time.Second)
}
func TestConnectionManager_Purge(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
pt.Save(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key, "2")
pt.Save(pt.ProxyCfg.MaxConnectionNum.Key, "2")
defer pt.Reset(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key)
defer pt.Reset(pt.ProxyCfg.MaxConnectionNum.Key)
s := newConnectionManager()
defer s.Stop()
repeat := 10
for i := 0; i < repeat; i++ {
s.Register(context.TODO(), int64(i), &commonpb.ClientInfo{})
}
assert.Eventually(t, func() bool {
return s.clientInfos.Len() <= 2
}, time.Second*5, time.Second)
}

View File

@ -0,0 +1,56 @@
package connection
import (
"container/heap"
"time"
)
type queueItem struct {
identifier int64
lastActiveTime time.Time
}
func newQueryItem(identifier int64, lastActiveTime time.Time) *queueItem {
return &queueItem{
identifier: identifier,
lastActiveTime: lastActiveTime,
}
}
type priorityQueue []*queueItem
func (pq priorityQueue) Len() int {
return len(pq)
}
func (pq priorityQueue) Less(i, j int) bool {
// we should purge the oldest, so the newest should be on the root.
return pq[i].lastActiveTime.After(pq[j].lastActiveTime)
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *priorityQueue) Push(x interface{}) {
item := x.(*queueItem)
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[:n-1]
return item
}
func newPriorityQueueWithCap(cap int) priorityQueue {
q := make(priorityQueue, 0, cap)
heap.Init(&q)
return q
}
func newPriorityQueue() priorityQueue {
return newPriorityQueueWithCap(0)
}

View File

@ -0,0 +1,23 @@
package connection
import (
"container/heap"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_priorityQueue(t *testing.T) {
q := newPriorityQueue()
repeat := 10
for i := 0; i < repeat; i++ {
heap.Push(&q, newQueryItem(int64(i), time.Now()))
}
counter := repeat - 1
for q.Len() > 0 {
item := heap.Pop(&q).(*queueItem)
assert.Equal(t, int64(counter), item.identifier)
counter--
}
}

View File

@ -1051,6 +1051,7 @@ type proxyConfig struct {
// connection manager
ConnectionCheckIntervalSeconds ParamItem `refreshable:"true"`
ConnectionClientInfoTTLSeconds ParamItem `refreshable:"true"`
MaxConnectionNum ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
}
@ -1386,6 +1387,15 @@ please adjust in embedded Milvus: false`,
Export: true,
}
p.ConnectionClientInfoTTLSeconds.Init(base.mgr)
p.MaxConnectionNum = ParamItem{
Key: "proxy.maxConnectionNum",
Version: "2.3.11",
Doc: "the max client info numbers that proxy should manage, avoid too many client infos",
DefaultValue: "10000",
Export: true,
}
p.MaxConnectionNum.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////