Fix memory leak when Put duplicated segments (#26693)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-08-30 11:37:00 +08:00 committed by GitHub
parent 9598a8b236
commit 95dcf7fa06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -149,9 +149,9 @@ func NewSegmentManager() *segmentManager {
} }
func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
var replacedSegment []Segment
mgr.mu.Lock() mgr.mu.Lock()
defer mgr.mu.Unlock() defer mgr.mu.Unlock()
targetMap := mgr.growingSegments targetMap := mgr.growingSegments
switch segmentType { switch segmentType {
case SegmentTypeGrowing: case SegmentTypeGrowing:
@ -165,37 +165,51 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
for _, segment := range segments { for _, segment := range segments {
oldSegment, ok := targetMap[segment.ID()] oldSegment, ok := targetMap[segment.ID()]
if ok && oldSegment.Version() >= segment.Version() { if ok {
log.Warn("Invalid segment distribution changed, skip it", if oldSegment.Version() >= segment.Version() {
zap.Int64("segmentID", segment.ID()), log.Warn("Invalid segment distribution changed, skip it",
zap.Int64("oldVersion", oldSegment.Version()), zap.Int64("segmentID", segment.ID()),
zap.Int64("newVersion", segment.Version()), zap.Int64("oldVersion", oldSegment.Version()),
) zap.Int64("newVersion", segment.Version()),
continue )
// delete redundant segment
if s, ok := segment.(*LocalSegment); ok {
DeleteSegment(s)
}
continue
}
replacedSegment = append(replacedSegment, oldSegment)
} }
targetMap[segment.ID()] = segment targetMap[segment.ID()] = segment
if !ok { eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection())))
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) metrics.QueryNodeNumSegments.WithLabelValues(
metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Inc()
if segment.RowNum() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()), fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()), fmt.Sprint(segment.Partition()),
segment.Type().String(), segment.Type().String(),
fmt.Sprint(len(segment.Indexes())), fmt.Sprint(len(segment.Indexes())),
).Inc() ).Add(float64(segment.RowNum()))
if segment.RowNum() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Add(float64(segment.RowNum()))
}
} }
} }
mgr.updateMetric() mgr.updateMetric()
// release replaced segment
if len(replacedSegment) > 0 {
go func() {
for _, segment := range replacedSegment {
remove(segment.(*LocalSegment))
}
}()
}
} }
func (mgr *segmentManager) UpdateSegmentBy(action SegmentAction, filters ...SegmentFilter) int { func (mgr *segmentManager) UpdateSegmentBy(action SegmentAction, filters ...SegmentFilter) int {