mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
fix: Fix import reader goroutine leak (#41869)
Close the chunk manager's reader after the import completes to prevent goroutine leaks. issues: https://github.com/milvus-io/milvus/issues/41868 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
ae43230703
commit
6c1a37fca1
@ -151,7 +151,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport() {
|
|||||||
cm := mocks.NewChunkManager(s.T())
|
cm := mocks.NewChunkManager(s.T())
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
|
||||||
s.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
preimportReq := &datapb.PreImportRequest{
|
preimportReq := &datapb.PreImportRequest{
|
||||||
@ -205,7 +205,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() {
|
|||||||
}
|
}
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
|
||||||
s.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
preimportReq := &datapb.PreImportRequest{
|
preimportReq := &datapb.PreImportRequest{
|
||||||
@ -244,7 +244,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
|
|||||||
|
|
||||||
cm := mocks.NewChunkManager(s.T())
|
cm := mocks.NewChunkManager(s.T())
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
|
||||||
s.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
|
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
|
||||||
@ -305,7 +305,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
|
|||||||
|
|
||||||
cm := mocks.NewChunkManager(s.T())
|
cm := mocks.NewChunkManager(s.T())
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
|
||||||
s.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
|
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
|
||||||
|
|||||||
@ -126,6 +126,7 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]
|
|||||||
// no need to read nulls in DeleteEventType
|
// no need to read nulls in DeleteEventType
|
||||||
rowsSet, _, err := readData(reader, storage.DeleteEventType)
|
rowsSet, _, err := readData(reader, storage.DeleteEventType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
reader.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, rows := range rowsSet {
|
for _, rows := range rowsSet {
|
||||||
@ -133,6 +134,7 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]
|
|||||||
dl := &storage.DeleteLog{}
|
dl := &storage.DeleteLog{}
|
||||||
err = dl.Parse(row)
|
err = dl.Parse(row)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
reader.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if dl.Ts >= tsStart && dl.Ts <= tsEnd {
|
if dl.Ts >= tsStart && dl.Ts <= tsEnd {
|
||||||
@ -143,6 +145,7 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
reader.Close()
|
||||||
}
|
}
|
||||||
return deleteData, nil
|
return deleteData, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,6 +37,7 @@ type Row = map[storage.FieldID]any
|
|||||||
type reader struct {
|
type reader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cm storage.ChunkManager
|
cm storage.ChunkManager
|
||||||
|
cmr storage.FileReader
|
||||||
schema *schemapb.CollectionSchema
|
schema *schemapb.CollectionSchema
|
||||||
|
|
||||||
cr *csv.Reader
|
cr *csv.Reader
|
||||||
@ -74,6 +75,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
|
|||||||
return &reader{
|
return &reader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cm: cm,
|
cm: cm,
|
||||||
|
cmr: cmReader,
|
||||||
schema: schema,
|
schema: schema,
|
||||||
cr: csvReader,
|
cr: csvReader,
|
||||||
parser: rowParser,
|
parser: rowParser,
|
||||||
@ -120,7 +122,11 @@ func (r *reader) Read() (*storage.InsertData, error) {
|
|||||||
return insertData, nil
|
return insertData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *reader) Close() {}
|
func (r *reader) Close() {
|
||||||
|
if r.cmr != nil {
|
||||||
|
r.cmr.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *reader) Size() (int64, error) {
|
func (r *reader) Size() (int64, error) {
|
||||||
if size := r.fileSize.Load(); size != 0 {
|
if size := r.fileSize.Load(); size != 0 {
|
||||||
|
|||||||
@ -40,6 +40,7 @@ type Row = map[storage.FieldID]any
|
|||||||
type reader struct {
|
type reader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cm storage.ChunkManager
|
cm storage.ChunkManager
|
||||||
|
cmr storage.FileReader
|
||||||
schema *schemapb.CollectionSchema
|
schema *schemapb.CollectionSchema
|
||||||
|
|
||||||
fileSize *atomic.Int64
|
fileSize *atomic.Int64
|
||||||
@ -65,6 +66,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
|
|||||||
reader := &reader{
|
reader := &reader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cm: cm,
|
cm: cm,
|
||||||
|
cmr: r,
|
||||||
schema: schema,
|
schema: schema,
|
||||||
fileSize: atomic.NewInt64(0),
|
fileSize: atomic.NewInt64(0),
|
||||||
filePath: path,
|
filePath: path,
|
||||||
@ -180,4 +182,8 @@ func (j *reader) Size() (int64, error) {
|
|||||||
return size, nil
|
return size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *reader) Close() {}
|
func (j *reader) Close() {
|
||||||
|
if j.cmr != nil {
|
||||||
|
j.cmr.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -248,8 +248,6 @@ func (c *FieldReader) Next(count int64) (any, error) {
|
|||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FieldReader) Close() {}
|
|
||||||
|
|
||||||
// setByteOrder sets BigEndian/LittleEndian, the logic of this method is copied from npyio lib
|
// setByteOrder sets BigEndian/LittleEndian, the logic of this method is copied from npyio lib
|
||||||
func (c *FieldReader) setByteOrder() {
|
func (c *FieldReader) setByteOrder() {
|
||||||
var nativeEndian binary.ByteOrder
|
var nativeEndian binary.ByteOrder
|
||||||
|
|||||||
@ -35,6 +35,7 @@ import (
|
|||||||
type reader struct {
|
type reader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cm storage.ChunkManager
|
cm storage.ChunkManager
|
||||||
|
cmrs map[int64]storage.FileReader
|
||||||
schema *schemapb.CollectionSchema
|
schema *schemapb.CollectionSchema
|
||||||
|
|
||||||
fileSize *atomic.Int64
|
fileSize *atomic.Int64
|
||||||
@ -72,6 +73,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
|
|||||||
return &reader{
|
return &reader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cm: cm,
|
cm: cm,
|
||||||
|
cmrs: readers,
|
||||||
schema: schema,
|
schema: schema,
|
||||||
fileSize: atomic.NewInt64(0),
|
fileSize: atomic.NewInt64(0),
|
||||||
paths: paths,
|
paths: paths,
|
||||||
@ -119,13 +121,13 @@ func (r *reader) Size() (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *reader) Close() {
|
func (r *reader) Close() {
|
||||||
for _, cr := range r.frs {
|
for _, cmr := range r.cmrs {
|
||||||
cr.Close()
|
cmr.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateReaders(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, paths []string) (map[int64]io.Reader, error) {
|
func CreateReaders(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, paths []string) (map[int64]storage.FileReader, error) {
|
||||||
readers := make(map[int64]io.Reader)
|
readers := make(map[int64]storage.FileReader)
|
||||||
nameToPath := lo.SliceToMap(paths, func(path string) (string, string) {
|
nameToPath := lo.SliceToMap(paths, func(path string) (string, string) {
|
||||||
nameWithExt := filepath.Base(path)
|
nameWithExt := filepath.Base(path)
|
||||||
name := strings.TrimSuffix(nameWithExt, filepath.Ext(nameWithExt))
|
name := strings.TrimSuffix(nameWithExt, filepath.Ext(nameWithExt))
|
||||||
|
|||||||
@ -212,8 +212,6 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FieldReader) Close() {}
|
|
||||||
|
|
||||||
func ReadBoolData(pcr *FieldReader, count int64) (any, error) {
|
func ReadBoolData(pcr *FieldReader, count int64) (any, error) {
|
||||||
chunked, err := pcr.columnReader.NextBatch(count)
|
chunked, err := pcr.columnReader.NextBatch(count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -38,6 +38,7 @@ import (
|
|||||||
type reader struct {
|
type reader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cm storage.ChunkManager
|
cm storage.ChunkManager
|
||||||
|
cmr storage.FileReader
|
||||||
schema *schemapb.CollectionSchema
|
schema *schemapb.CollectionSchema
|
||||||
|
|
||||||
path string
|
path string
|
||||||
@ -81,6 +82,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
|
|||||||
return &reader{
|
return &reader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cm: cm,
|
cm: cm,
|
||||||
|
cmr: cmReader,
|
||||||
schema: schema,
|
schema: schema,
|
||||||
fileSize: atomic.NewInt64(0),
|
fileSize: atomic.NewInt64(0),
|
||||||
path: path,
|
path: path,
|
||||||
@ -140,11 +142,11 @@ func (r *reader) Size() (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *reader) Close() {
|
func (r *reader) Close() {
|
||||||
for _, cr := range r.frs {
|
|
||||||
cr.Close()
|
|
||||||
}
|
|
||||||
err := r.r.Close()
|
err := r.r.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("close parquet reader failed", zap.Error(err))
|
log.Warn("close parquet reader failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
if r.cmr != nil {
|
||||||
|
r.cmr.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user