diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index f7ce6ea949..4fa1927660 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -494,17 +494,9 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo { // GetNodeChannelsByCollectionID gets all node channels map of the collection func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { - nodeChs := make(map[UniqueID][]string) - for _, nodeChannels := range c.GetAssignedChannels() { - var channelNames []string - for name, ch := range nodeChannels.Channels { - if ch.GetCollectionID() == collectionID { - channelNames = append(channelNames, name) - } - } - nodeChs[nodeChannels.NodeID] = channelNames - } - return nodeChs + c.mu.RLock() + defer c.mu.RUnlock() + return c.store.GetNodeChannelsByCollectionID(collectionID) } // Get all channels belong to the collection @@ -891,15 +883,6 @@ func (c *ChannelManagerImpl) GetCollectionIDByChannel(channelName string) (bool, return false, 0 } -func (c *ChannelManagerImpl) GetNodeIDByChannelName(channelName string) (UniqueID, bool) { - for _, nodeChannel := range c.GetAssignedChannels() { - if _, ok := nodeChannel.Channels[channelName]; ok { - return nodeChannel.NodeID, true - } - } - return 0, false -} - func (c *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index ccaed65ea9..4bea2daf96 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -48,7 +48,6 @@ type ChannelManager interface { FindWatcher(channel string) (UniqueID, error) GetChannel(nodeID int64, channel string) (RWChannel, bool) - GetNodeIDByChannelName(channel string) (int64, bool) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string GetChannelsByCollectionID(collectionID int64) []RWChannel GetChannelNamesByCollectionID(collectionID int64) []string @@ -351,31 +350,10 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC return nil, false } -func (m *ChannelManagerImplV2) GetNodeIDByChannelName(channel string) (int64, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - nodeChannels := m.store.GetNodeChannelsBy( - WithoutBufferNode(), - WithChannelName(channel)) - - if len(nodeChannels) > 0 { - return nodeChannels[0].NodeID, true - } - - return 0, false -} - func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { m.mu.RLock() defer m.mu.RUnlock() - nodeChs := make(map[UniqueID][]string) - nodeChannels := m.store.GetNodeChannelsBy( - WithoutBufferNode(), - WithCollectionIDV2(collectionID)) - lo.ForEach(nodeChannels, func(info *NodeChannelInfo, _ int) { - nodeChs[info.NodeID] = lo.Keys(info.Channels) - }) - return nodeChs + return m.store.GetNodeChannelsByCollectionID(collectionID) } func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel { diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index c59e626a6f..76524df0a9 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -37,6 +37,8 @@ import ( ) // ROChannelStore is a read only channel store for channels and nodes. +// +//go:generate mockery --name=ROChannelStore --structname=ROChannelStore --output=./ --filename=mock_ro_channel_store.go --with-expecter type ROChannelStore interface { // GetNode returns the channel info of a specific node. // Returns nil if the node doesn't belong to the cluster @@ -52,12 +54,16 @@ type ROChannelStore interface { GetNodes() []int64 // GetNodeChannelCount GetNodeChannelCount(nodeID int64) int + // GetNodeChannels for given collection + GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string // GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo } // RWChannelStore is the read write channel store for channels and nodes. +// +//go:generate mockery --name=RWChannelStore --structname=RWChannelStore --output=./ --filename=mock_channel_store.go --with-expecter type RWChannelStore interface { ROChannelStore // Reload restores the buffer channels and node-channels mapping form kv. @@ -458,6 +464,23 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo { return ret } +func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { + nodeChs := make(map[UniqueID][]string) + for id, info := range c.channelsInfo { + if id == bufferID { + continue + } + var channelNames []string + for name, ch := range info.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } + nodeChs[id] = channelNames + } + return nodeChs +} + // GetBufferChannelInfo returns all unassigned channels. func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { if info, ok := c.channelsInfo[bufferID]; ok { diff --git a/internal/datacoord/channel_store_v2.go b/internal/datacoord/channel_store_v2.go index 82f0d14e9e..dcbeef2863 100644 --- a/internal/datacoord/channel_store_v2.go +++ b/internal/datacoord/channel_store_v2.go @@ -366,7 +366,7 @@ func WithChannelStates(states ...ChannelState) ChannelSelector { } func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { - nodeChannels := make(map[int64]*NodeChannelInfo) + var nodeChannels []*NodeChannelInfo for nodeID, cInfo := range c.channelsInfo { if nodeSelector(nodeID) { selected := make(map[string]RWChannel) @@ -382,13 +382,13 @@ func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channel selected[chName] = channel } } - nodeChannels[nodeID] = &NodeChannelInfo{ + nodeChannels = append(nodeChannels, &NodeChannelInfo{ NodeID: nodeID, Channels: selected, - } + }) } } - return lo.Values(nodeChannels) + return nodeChannels } func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { @@ -401,6 +401,23 @@ func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { return ret } +func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { + nodeChs := make(map[UniqueID][]string) + for id, info := range c.channelsInfo { + if id == bufferID { + continue + } + var channelNames []string + for name, ch := range info.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } + nodeChs[id] = channelNames + } + return nodeChs +} + func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo { return c.GetNode(bufferID) } diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index e0e469fba7..fc7cb51ef3 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -179,6 +179,50 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int return _c } +// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID +func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { + ret := _m.Called(collectionID) + + var r0 map[int64][]string + if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]string) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' +type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { + *mock.Call +} + +// GetNodeChannelsByCollectionID is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { _va := make([]interface{}, len(channelSelectors)) diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5946934e88..40f0d46fe6 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -102,7 +102,7 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen filter.AddFilter(criterion) } var result []*SegmentInfo - var candidates []*SegmentInfo + var candidates map[int64]*SegmentInfo // apply criterion switch { case criterion.collectionID > 0: @@ -110,9 +110,9 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen if !ok { return nil } - candidates = lo.Values(collSegments.segments) + candidates = collSegments.segments default: - candidates = lo.Values(s.segments) + candidates = s.segments } for _, segment := range candidates { if criterion.Match(segment) {