diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index d224053ea4..5f4d0bc136 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -551,10 +551,8 @@ func (c *ChannelManagerImpl) Match(nodeID int64, channel string) bool { return false } - for _, ch := range info.Channels { - if ch.GetName() == channel { - return true - } + if _, ok := info.Channels[channel]; ok { + return true } return false } diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index ab458fc04b..b463b4aaa0 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -399,43 +399,35 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo { // GetBufferChannelInfo returns all unassigned channels. func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { - for id, info := range c.channelsInfo { - if id == bufferID { - return info - } + if info, ok := c.channelsInfo[bufferID]; ok { + return info } return nil } // GetNode returns the channel info of a given node. func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo { - for id, info := range c.channelsInfo { - if id == nodeID { - return info - } + if info, ok := c.channelsInfo[nodeID]; ok { + return info } return nil } func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int { - for id, info := range c.channelsInfo { - if id == nodeID { - return len(info.Channels) - } + if info, ok := c.channelsInfo[nodeID]; ok { + return len(info.Channels) } return 0 } // Delete removes the given node from the channel store and returns its channels. func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) { - for id, info := range c.channelsInfo { - if id == nodeID { - if err := c.remove(nodeID); err != nil { - return nil, err - } - delete(c.channelsInfo, id) - return lo.Values(info.Channels), nil + if info, ok := c.channelsInfo[nodeID]; ok { + if err := c.remove(nodeID); err != nil { + return nil, err } + delete(c.channelsInfo, nodeID) + return lo.Values(info.Channels), nil } return nil, nil } diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 6519ec5106..b3ad02de01 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -385,7 +385,7 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni return state } -func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 { +func (m *indexMeta) GetIndexedSegments(collectionID int64, segmentIDs, fieldIDs []UniqueID) []int64 { m.RLock() defer m.RUnlock() @@ -412,9 +412,11 @@ func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) } ret := make([]int64, 0) - for sid, indexes := range m.segmentIndexes { - if checkSegmentState(indexes) { - ret = append(ret, sid) + for _, sid := range segmentIDs { + if indexes, ok := m.segmentIndexes[sid]; ok { + if checkSegmentState(indexes) { + ret = append(ret, sid) + } } } diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index f6018c54de..806e841c94 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -614,17 +614,17 @@ func TestMeta_GetIndexedSegment(t *testing.T) { } t.Run("success", func(t *testing.T) { - segments := m.GetIndexedSegments(collID, []int64{fieldID}) + segments := m.GetIndexedSegments(collID, []int64{segID}, []int64{fieldID}) assert.Len(t, segments, 1) }) t.Run("no index on field", func(t *testing.T) { - segments := m.GetIndexedSegments(collID, []int64{fieldID + 1}) + segments := m.GetIndexedSegments(collID, []int64{segID}, []int64{fieldID + 1}) assert.Len(t, segments, 0) }) t.Run("no index", func(t *testing.T) { - segments := m.GetIndexedSegments(collID+1, []int64{fieldID}) + segments := m.GetIndexedSegments(collID+1, []int64{segID}, []int64{fieldID}) assert.Len(t, segments, 0) }) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 5f781df27e..cb88bb8b6a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -321,8 +321,11 @@ func (m *meta) GetAllCollectionNumRows() map[int64]int64 { m.RLock() defer m.RUnlock() ret := make(map[int64]int64, len(m.collections)) - for collectionID := range m.collections { - ret[collectionID] = m.getNumRowsOfCollectionUnsafe(collectionID) + segments := m.segments.GetSegments() + for _, segment := range segments { + if isSegmentHealthy(segment) { + ret[segment.GetCollectionID()] += segment.GetNumOfRows() + } } return ret } @@ -1053,14 +1056,7 @@ func (m *meta) GetFlushingSegments() []*SegmentInfo { func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { m.RLock() defer m.RUnlock() - var ret []*SegmentInfo - segments := m.segments.GetSegments() - for _, info := range segments { - if selector(info) { - ret = append(ret, info) - } - } - return ret + return m.segments.GetSegmentsBySelector(selector) } // AddAllocation add allocation in segment diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index efed75b744..9ae7d364ab 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -93,6 +93,16 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo { return segments } +func (s *SegmentsInfo) GetSegmentsBySelector(selector SegmentInfoSelector) []*SegmentInfo { + var segments []*SegmentInfo + for _, segment := range s.segments { + if selector(segment) { + segments = append(segments, segment) + } + } + return segments +} + // GetCompactionTo returns the segment that the provided segment is compacted to. // Return (nil, false) if given segmentID can not found in the meta. // Return (nil, true) if given segmentID can be found not no compaction to. diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index b369031074..e94da52c93 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -92,9 +92,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo vecFieldIDs = append(vecFieldIDs, field.GetFieldID()) } } + segmentIDs := lo.Map(segmentList, func(seg *SegmentInfo, _ int) UniqueID { + return seg.GetID() + }) // get indexed segments which finish build index on all vector field - indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs) + indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, vecFieldIDs) if len(indexed) > 0 { indexedSet := typeutil.NewUniqueSet(indexed...) for _, segment := range segmentList {