From ed55b14484cc6696027631bda557faa95053f43c Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 10 Jun 2025 16:28:34 +0800 Subject: [PATCH] fix: Release data memory after sync task completes (#42627) Release data memory after sync task completes to prevent datanode oom during import. issue: https://github.com/milvus-io/milvus/issues/42608 Signed-off-by: bigsheeper --- internal/flushcommon/syncmgr/serializer.go | 6 ++++++ internal/flushcommon/syncmgr/task.go | 2 ++ internal/flushcommon/syncmgr/task_test.go | 10 ++++++++++ 3 files changed, 18 insertions(+) diff --git a/internal/flushcommon/syncmgr/serializer.go b/internal/flushcommon/syncmgr/serializer.go index 9f5386133e..23d361d230 100644 --- a/internal/flushcommon/syncmgr/serializer.go +++ b/internal/flushcommon/syncmgr/serializer.go @@ -143,3 +143,9 @@ func (p *SyncPack) WithDataSource(source string) *SyncPack { p.dataSource = source return p } + +func (p *SyncPack) ReleaseData() { + p.insertData = nil + p.deltaData = nil + p.bm25Stats = nil +} diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 6f1231d8dc..985631be3a 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -168,6 +168,8 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } } + t.pack.ReleaseData() + actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)} if t.pack.isFlush { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index 6ed5cda000..46713897d4 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -210,6 +210,12 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) { action(seg) }).Return() + isDataReleased := func(task *SyncTask) bool { + return task.pack.insertData == nil && + task.pack.deltaData == nil && + task.pack.bm25Stats == nil + } + s.Run("without_data", func() { task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint( &msgpb.MsgPosition{ @@ -225,6 +231,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) { err := task.Run(ctx) s.NoError(err) + s.True(isDataReleased(task)) // data should be released after task finished }) s.Run("with_insert_delete_cp", func() { @@ -244,6 +251,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) { err := task.Run(ctx) s.NoError(err) + s.True(isDataReleased(task)) // data should be released after task finished }) s.Run("with_flush", func() { @@ -263,6 +271,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) { } err := task.Run(ctx) s.NoError(err) + s.True(isDataReleased(task)) // data should be released after task finished }) s.Run("with_drop", func() { @@ -282,6 +291,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) { } err := task.Run(ctx) s.NoError(err) + s.True(isDataReleased(task)) // data should be released after task finished }) }