mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Release data memory after sync task completes (#42627)
Release data memory after sync task completes to prevent datanode oom during import. issue: https://github.com/milvus-io/milvus/issues/42608 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
c9680a5b56
commit
ed55b14484
@ -143,3 +143,9 @@ func (p *SyncPack) WithDataSource(source string) *SyncPack {
|
|||||||
p.dataSource = source
|
p.dataSource = source
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SyncPack) ReleaseData() {
|
||||||
|
p.insertData = nil
|
||||||
|
p.deltaData = nil
|
||||||
|
p.bm25Stats = nil
|
||||||
|
}
|
||||||
|
|||||||
@ -168,6 +168,8 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.pack.ReleaseData()
|
||||||
|
|
||||||
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
|
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
|
||||||
if t.pack.isFlush {
|
if t.pack.isFlush {
|
||||||
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
|
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
|
||||||
|
|||||||
@ -210,6 +210,12 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) {
|
|||||||
action(seg)
|
action(seg)
|
||||||
}).Return()
|
}).Return()
|
||||||
|
|
||||||
|
isDataReleased := func(task *SyncTask) bool {
|
||||||
|
return task.pack.insertData == nil &&
|
||||||
|
task.pack.deltaData == nil &&
|
||||||
|
task.pack.bm25Stats == nil
|
||||||
|
}
|
||||||
|
|
||||||
s.Run("without_data", func() {
|
s.Run("without_data", func() {
|
||||||
task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint(
|
task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint(
|
||||||
&msgpb.MsgPosition{
|
&msgpb.MsgPosition{
|
||||||
@ -225,6 +231,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) {
|
|||||||
|
|
||||||
err := task.Run(ctx)
|
err := task.Run(ctx)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
s.True(isDataReleased(task)) // data should be released after task finished
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Run("with_insert_delete_cp", func() {
|
s.Run("with_insert_delete_cp", func() {
|
||||||
@ -244,6 +251,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) {
|
|||||||
|
|
||||||
err := task.Run(ctx)
|
err := task.Run(ctx)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
s.True(isDataReleased(task)) // data should be released after task finished
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Run("with_flush", func() {
|
s.Run("with_flush", func() {
|
||||||
@ -263,6 +271,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) {
|
|||||||
}
|
}
|
||||||
err := task.Run(ctx)
|
err := task.Run(ctx)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
s.True(isDataReleased(task)) // data should be released after task finished
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Run("with_drop", func() {
|
s.Run("with_drop", func() {
|
||||||
@ -282,6 +291,7 @@ func (s *SyncTaskSuite) runTestRunNormal(storageVersion int64) {
|
|||||||
}
|
}
|
||||||
err := task.Run(ctx)
|
err := task.Run(ctx)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
s.True(isDataReleased(task)) // data should be released after task finished
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user