fix: avoid memoryleak in rendezvousFlushManager (#33112)

issue: https://github.com/milvus-io/milvus/issues/33110

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2024-05-20 22:19:40 +08:00 committed by GitHub
parent 7a617e2921
commit 23e7155a48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 102 additions and 3 deletions

View File

@ -1085,6 +1085,9 @@ type mockFlushManager struct {
}
}
func (mfm *mockFlushManager) start() {
}
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) {

View File

@ -91,6 +91,9 @@ func (dsService *dataSyncService) start() {
log.Warn("dataSyncService starting flow graph is nil", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
}
if dsService.flushManager != nil {
dsService.flushManager.start()
}
}
func (dsService *dataSyncService) GracefullyClose() {

View File

@ -22,6 +22,7 @@ import (
"path"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -63,6 +64,7 @@ type flushManager interface {
notifyAllFlushed()
// close handles resource clean up
close()
start()
}
// segmentFlushPack contains result to save into meta
@ -221,6 +223,14 @@ func (q *orderFlushQueue) getTailChan() chan struct{} {
return q.tailCh
}
func (q *orderFlushQueue) checkEmpty() bool {
if q.taskMut.TryLock() {
defer q.taskMut.Unlock()
return q.runningTasks == 0
}
return false
}
// injectTask handles injection for empty flush queue
type injectTask struct {
startSignal, finishSignal chan struct{}
@ -281,6 +291,16 @@ type rendezvousFlushManager struct {
dropping atomic.Bool
dropHandler dropHandler
ctx context.Context
cancel context.CancelFunc
cleanLock sync.RWMutex
wg sync.WaitGroup
}
// start the cleanLoop
func (m *rendezvousFlushManager) start() {
m.wg.Add(1)
go m.cleanLoop()
}
// getFlushQueue gets or creates an orderFlushQueue for segment id if not found
@ -315,6 +335,8 @@ func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flush
return
}
// normal mode
m.cleanLock.RLock()
defer m.cleanLock.RUnlock()
m.getFlushQueue(segmentID).enqueueInsertFlush(task, binlogs, statslogs, flushed, dropped, pos)
}
@ -323,8 +345,10 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
// in dropping mode
if m.dropping.Load() {
// preventing separate delete, check position exists in queue first
m.cleanLock.RLock()
q := m.getFlushQueue(segmentID)
_, ok := q.working.Get(getSyncTaskID(pos))
m.cleanLock.RUnlock()
// if ok, means position insert data already in queue, just handle task in normal mode
// if not ok, means the insert buf should be handle in drop mode
if !ok {
@ -343,6 +367,8 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
}
}
// normal mode
m.cleanLock.RLock()
defer m.cleanLock.RUnlock()
m.getFlushQueue(segmentID).enqueueDelFlush(task, deltaLogs, pos)
}
@ -574,11 +600,48 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
// injectFlush inject process before task finishes
func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
go injection.waitForInjected()
m.cleanLock.RLock()
defer m.cleanLock.RUnlock()
for _, segmentID := range segments {
m.getFlushQueue(segmentID).inject(injection)
}
}
// tryRemoveFlushQueue try to remove queue which running task is zero
func (m *rendezvousFlushManager) tryRemoveFlushQueue() {
m.cleanLock.Lock()
defer m.cleanLock.Unlock()
m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool {
if queue.checkEmpty() {
m.dispatcher.Remove(segmentID)
}
return true
})
}
// segmentNum return the number of segment in dispatcher
func (m *rendezvousFlushManager) segmentNum() int {
return m.dispatcher.Len()
}
// cleanLoop calls tryRemoveFlushQueue periodically
func (m *rendezvousFlushManager) cleanLoop() {
defer m.wg.Done()
ticker := time.NewTicker(Params.DataNodeCfg.FlushMgrCleanInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
log.Info("rendezvousFlushManager quit clean loop")
return
case <-ticker.C:
m.tryRemoveFlushQueue()
ticker.Reset(Params.DataNodeCfg.FlushMgrCleanInterval.GetAsDuration(time.Second))
}
}
}
// fetch meta info for segment
func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *msgpb.MsgPosition) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) {
if !m.hasSegment(segmentID, true) {
@ -627,6 +690,8 @@ func (m *rendezvousFlushManager) startDropping() {
m.dropHandler.Lock()
defer m.dropHandler.Unlock()
// apply injection if any
m.cleanLock.RLock()
defer m.cleanLock.RUnlock()
for _, pack := range m.dropHandler.packs {
q := m.getFlushQueue(pack.segmentID)
// queue will never be nil, sincde getFlushQueue will initialize one if not found
@ -651,6 +716,7 @@ func getSyncTaskID(pos *msgpb.MsgPosition) string {
// close cleans up all the left members
func (m *rendezvousFlushManager) close() {
m.cancel()
m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool {
// assertion ok
queue.taskMut.Lock()
@ -659,6 +725,7 @@ func (m *rendezvousFlushManager) close() {
return true
})
m.waitForAllFlushQueue()
m.wg.Wait()
log.Ctx(context.Background()).Info("flush manager closed", zap.Int64("collectionID", m.Channel.getCollectionID()))
}
@ -720,6 +787,7 @@ func (t *flushBufferDeleteTask) flushDeleteData() error {
// NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv
func NewRendezvousFlushManager(allocator allocator.Allocator, cm storage.ChunkManager, channel Channel, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager {
ctx, cancel := context.WithCancel(context.Background())
fm := &rendezvousFlushManager{
Allocator: allocator,
ChunkManager: cm,
@ -729,6 +797,8 @@ func NewRendezvousFlushManager(allocator allocator.Allocator, cm storage.ChunkMa
flushAndDrop: drop,
},
dispatcher: typeutil.NewConcurrentMap[int64, *orderFlushQueue](),
ctx: ctx,
cancel: cancel,
}
// start with normal mode
fm.dropping.Store(false)

View File

@ -209,7 +209,8 @@ func TestRendezvousFlushManager(t *testing.T) {
})
assert.NoError(t, err)
}
assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond)
m.waitForAllFlushQueue()
assert.True(t, counter.Load() == int64(size))
_, _, err := m.serializePkStatsLog(0, false, nil, &storage.InsertCodec{Schema: &etcdpb.CollectionMeta{Schema: &schemapb.CollectionSchema{}, ID: 0}})
assert.Error(t, err)
@ -251,6 +252,11 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
counter.Inc()
}, emptyFlushAndDropFunc)
oldIntervalStr := Params.DataNodeCfg.FlushMgrCleanInterval.GetValue()
Params.Save(Params.DataNodeCfg.FlushMgrCleanInterval.Key, "1")
defer Params.Save(Params.DataNodeCfg.FlushMgrCleanInterval.Key, oldIntervalStr)
m.start()
ti := newTaskInjection(1, func(*segmentFlushPack) {})
m.injectFlush(ti, 1)
<-ti.injected
@ -274,7 +280,10 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
})
assert.NoError(t, err)
}
assert.Eventually(t, func() bool { return counter.Load() == int64(size) }, 3*time.Second, 100*time.Millisecond)
m.waitForAllFlushQueue()
assert.True(t, counter.Load() == int64(size))
assert.Eventually(t, func() bool { return m.segmentNum() == 0 }, 3*time.Second, 100*time.Millisecond)
id := make([]byte, 10)
rand.Read(id)
@ -332,6 +341,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
})
assert.Eventually(t, func() bool { return counter.Load() == int64(size+3) }, 3*time.Second, 100*time.Millisecond)
assert.EqualValues(t, 4, packs[size+1].segmentID)
m.close()
}
func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {

View File

@ -35,6 +35,7 @@ const (
DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded
DefaultGracefulTime = 5000 // ms
DefaultGracefulStopTimeout = 1800 // s, for node
DefaultFlushMgrCleanInterval = 300 // s
DefaultProxyGracefulStopTimeout = 30 // sfor proxy
DefaultCoordGracefulStopTimeout = 5 // sfor coord
DefaultHighPriorityThreadCoreCoefficient = 10
@ -2845,7 +2846,8 @@ type dataNodeConfig struct {
MaxChannelCheckpointsPerRPC ParamItem `refreshable:"true"`
ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
FlushMgrCleanInterval ParamItem `refreshable:"true"`
}
func (p *dataNodeConfig) init(base *BaseTable) {
@ -3103,6 +3105,15 @@ func (p *dataNodeConfig) init(base *BaseTable) {
Export: true,
}
p.GracefulStopTimeout.Init(base.mgr)
p.FlushMgrCleanInterval = ParamItem{
Key: "datanode.flushMgrCleanInterval",
Version: "2.3.16",
DefaultValue: strconv.Itoa(DefaultFlushMgrCleanInterval),
Doc: "seconds. flush manager clean check interval",
Export: true,
}
p.FlushMgrCleanInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -445,6 +445,8 @@ func TestComponentParam(t *testing.T) {
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
params.Save("datanode.flushMgrCleanInterval", "200")
assert.Equal(t, 200*time.Second, Params.FlushMgrCleanInterval.GetAsDuration(time.Second))
})
t.Run("test indexNodeConfig", func(t *testing.T) {