diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 6f5ad8071a..136e7d11f0 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -338,9 +338,6 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) } func (mgr *segmentManager) Unpin(segments []Segment) { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - for _, segment := range segments { segment.RUnlock() } @@ -388,64 +385,116 @@ func (mgr *segmentManager) Empty() bool { // false otherwise func (mgr *segmentManager) Remove(segmentID UniqueID, scope querypb.DataScope) (int, int) { mgr.mu.Lock() - defer mgr.mu.Unlock() var removeGrowing, removeSealed int + var growing, sealed *LocalSegment switch scope { case querypb.DataScope_Streaming: - if remove(segmentID, mgr.growingSegments) { + growing = mgr.removeSegmentWithType(SegmentTypeGrowing, segmentID) + if growing != nil { removeGrowing = 1 } case querypb.DataScope_Historical: - if remove(segmentID, mgr.sealedSegments) { + sealed = mgr.removeSegmentWithType(SegmentTypeSealed, segmentID) + if sealed != nil { removeSealed = 1 } case querypb.DataScope_All: - if remove(segmentID, mgr.growingSegments) { + growing = mgr.removeSegmentWithType(SegmentTypeGrowing, segmentID) + if growing != nil { removeGrowing = 1 } - if remove(segmentID, mgr.sealedSegments) { + + sealed = mgr.removeSegmentWithType(SegmentTypeSealed, segmentID) + if sealed != nil { removeSealed = 1 } } - mgr.updateMetric() + mgr.mu.Unlock() + + if growing != nil { + remove(growing) + } + + if sealed != nil { + remove(sealed) + } + return removeGrowing, removeSealed } +func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID UniqueID) *LocalSegment { + switch typ { + case SegmentTypeGrowing: + s, ok := mgr.growingSegments[segmentID] + if ok { + delete(mgr.growingSegments, segmentID) + return s.(*LocalSegment) + } + + case SegmentTypeSealed: + s, ok := mgr.sealedSegments[segmentID] + if ok { + delete(mgr.sealedSegments, segmentID) + return s.(*LocalSegment) + } + default: + return nil + } + + return nil +} + func (mgr *segmentManager) RemoveBy(filters ...SegmentFilter) (int, int) { mgr.mu.Lock() - defer mgr.mu.Unlock() - var removeGrowing, removeSealed int + var removeGrowing, removeSealed []*LocalSegment for id, segment := range mgr.growingSegments { - if filter(segment, filters...) && remove(id, mgr.growingSegments) { - removeGrowing++ + if filter(segment, filters...) { + s := mgr.removeSegmentWithType(SegmentTypeGrowing, id) + if s != nil { + removeGrowing = append(removeGrowing, s) + } } } for id, segment := range mgr.sealedSegments { - if filter(segment, filters...) && remove(id, mgr.sealedSegments) { - removeSealed++ + if filter(segment, filters...) { + s := mgr.removeSegmentWithType(SegmentTypeSealed, id) + if s != nil { + removeSealed = append(removeSealed, s) + } } } - mgr.updateMetric() - return removeGrowing, removeSealed + mgr.mu.Unlock() + + for _, s := range removeGrowing { + remove(s) + } + + for _, s := range removeSealed { + remove(s) + } + + return len(removeGrowing), len(removeSealed) } func (mgr *segmentManager) Clear() { mgr.mu.Lock() defer mgr.mu.Unlock() - for id := range mgr.growingSegments { - remove(id, mgr.growingSegments) + for id, segment := range mgr.growingSegments { + delete(mgr.growingSegments, id) + remove(segment.(*LocalSegment)) } - for id := range mgr.sealedSegments { - remove(id, mgr.sealedSegments) + for id, segment := range mgr.sealedSegments { + delete(mgr.sealedSegments, id) + remove(segment.(*LocalSegment)) } mgr.updateMetric() } @@ -465,17 +514,9 @@ func (mgr *segmentManager) updateMetric() { metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len())) } -// returns true if the segment exists, -// false otherwise -func remove(segmentID int64, container map[int64]Segment) bool { - segment, ok := container[segmentID] - if !ok { - return false - } - delete(container, segmentID) - +func remove(segment *LocalSegment) bool { rowNum := segment.RowNum() - DeleteSegment(segment.(*LocalSegment)) + DeleteSegment(segment) metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 68302d1c05..69d5f5251d 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -71,6 +71,12 @@ func (s *ManagerSuite) TestGetBy() { segments := s.mgr.GetBy(WithType(typ)) s.Contains(segments, s.segments[i]) } + s.mgr.Clear() + + for _, typ := range s.types { + segments := s.mgr.GetBy(WithType(typ)) + s.Len(segments, 0) + } } func (s *ManagerSuite) TestRemoveGrowing() {