mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
fix: add GetSegments optimization to avoid meta mutex competition (#31025)
#30835 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
4bda6c33ad
commit
b9775a1816
@ -328,6 +328,20 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get segments By filter function
|
||||||
|
func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
var result []UniqueID
|
||||||
|
for _, id := range segIDs {
|
||||||
|
segment := m.segments.GetSegment(id)
|
||||||
|
if segment != nil && filterFunc(segment) {
|
||||||
|
result = append(result, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// GetSegment returns segment info with provided id
|
// GetSegment returns segment info with provided id
|
||||||
// include the unhealthy segment
|
// include the unhealthy segment
|
||||||
// if not segment is found, nil will be returned
|
// if not segment is found, nil will be returned
|
||||||
|
|||||||
@ -557,24 +557,16 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
|
|||||||
if len(segIDs) != 0 {
|
if len(segIDs) != 0 {
|
||||||
segCandidates = segIDs
|
segCandidates = segIDs
|
||||||
}
|
}
|
||||||
for _, id := range segCandidates {
|
|
||||||
info := s.meta.GetHealthySegment(id)
|
sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
|
||||||
if info == nil {
|
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
|
||||||
log.Warn("failed to get seg info from meta", zap.Int64("segmentID", id))
|
})
|
||||||
continue
|
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
|
||||||
}
|
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
|
||||||
if info.CollectionID != collectionID {
|
})
|
||||||
continue
|
ret = append(ret, sealedSegments...)
|
||||||
}
|
|
||||||
// idempotent sealed
|
for _, id := range growingSegments {
|
||||||
if info.State == commonpb.SegmentState_Sealed {
|
|
||||||
ret = append(ret, id)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// segment can be sealed only if it is growing.
|
|
||||||
if info.State != commonpb.SegmentState_Growing {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
|
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user