diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 047f593907..d224053ea4 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -508,13 +508,12 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo { func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { nodeChs := make(map[UniqueID][]string) for _, nodeChannels := range c.GetAssignedChannels() { - filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { - return channel.GetCollectionID() == collectionID - }) - channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string { - return channel.GetName() - }) - + var channelNames []string + for name, ch := range nodeChannels.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } nodeChs[nodeChannels.NodeID] = channelNames } return nodeChs @@ -524,11 +523,11 @@ func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel { channels := make([]RWChannel, 0) for _, nodeChannels := range c.GetAssignedChannels() { - filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { - return channel.GetCollectionID() == collectionID - }) - - channels = append(channels, filtered...) + for _, ch := range nodeChannels.Channels { + if ch.GetCollectionID() == collectionID { + channels = append(channels, ch) + } + } } return channels } @@ -807,7 +806,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string) } c.mu.RUnlock() - reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}} + reallocates := NewNodeChannelInfo(originNodeID, ch) isDropped := c.isMarkedDrop(channelName) c.mu.Lock() @@ -862,7 +861,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } - reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}} + reallocates := NewNodeChannelInfo(nodeID, chToCleanUp) isDropped := c.isMarkedDrop(channelName) c.mu.Lock() diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 536f65b1a1..7e890b55d3 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -56,6 +56,31 @@ func waitAndStore(t *testing.T, watchkv kv.MetaKv, key string, waitState, storeS } } +func waitPrefixAndStore(t *testing.T, watchkv kv.MetaKv, prefix string, waitState, storeState datapb.ChannelWatchState) string { + channelName := "" + for { + keys, values, err := watchkv.LoadWithPrefix(prefix) + if err == nil && len(values) > 0 { + for idx, value := range values { + watchInfo, err := parseWatchInfo(keys[idx], []byte(value)) + require.NoError(t, err) + require.Equal(t, waitState, watchInfo.GetState()) + + channelName = watchInfo.GetVchan().GetChannelName() + + watchInfo.State = storeState + data, err := proto.Marshal(watchInfo) + require.NoError(t, err) + + watchkv.Save(path.Join(prefix, watchInfo.GetVchan().GetChannelName()), string(data)) + } + break + } + time.Sleep(100 * time.Millisecond) + } + return channelName +} + // waitAndCheckState checks if the DataCoord writes expected state into Etcd func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { for { @@ -217,10 +242,8 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, - oldNode: {oldNode, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), + oldNode: NewNodeChannelInfo(oldNode), }, } @@ -260,9 +283,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), }, } @@ -306,10 +327,8 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, - oldNode: {oldNode, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), + oldNode: NewNodeChannelInfo(oldNode), }, } @@ -352,9 +371,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), }, } @@ -400,10 +417,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: channel1, CollectionID: collectionID}, - &channelMeta{Name: channel2, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}), }, } @@ -438,10 +452,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - bufferID: {bufferID, []RWChannel{ - &channelMeta{Name: channel1, CollectionID: collectionID}, - &channelMeta{Name: channel2, CollectionID: collectionID}, - }}, + bufferID: NewNodeChannelInfo(bufferID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}), }, } @@ -502,7 +513,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -682,11 +693,9 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - &channelMeta{Name: "channel-1", CollectionID: collectionID}, - &channelMeta{Name: "channel-2", CollectionID: collectionID}, - }}, - bufferID: {bufferID, []RWChannel{}}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID}, + &channelMeta{Name: "channel-2", CollectionID: collectionID}), + bufferID: NewNodeChannelInfo(bufferID), }, } chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)) @@ -774,7 +783,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } ch = chManager.getChannelByNodeAndName(nodeID, channelName) @@ -943,7 +952,7 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -966,7 +975,7 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -993,8 +1002,8 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, - 999: {999, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), + 999: NewNodeChannelInfo(999), }, } require.NoError(t, err) @@ -1024,8 +1033,8 @@ func TestChannelManager_Reload(t *testing.T) { cm.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{&channelMeta{Name: "channel1", CollectionID: 1}}}, - 2: {2, []RWChannel{&channelMeta{Name: "channel2", CollectionID: 1}}}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel1", CollectionID: 1}), + 2: NewNodeChannelInfo(2, &channelMeta{Name: "channel2", CollectionID: 1}), }, } @@ -1077,58 +1086,69 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - &channelMeta{Name: "channel-1", CollectionID: collectionID}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID}, &channelMeta{Name: "channel-2", CollectionID: collectionID}, - &channelMeta{Name: "channel-3", CollectionID: collectionID}, - }}, + &channelMeta{Name: "channel-3", CollectionID: collectionID}), }, } var channelBalanced string chManager.AddNode(2) - channelBalanced = "channel-1" - key := path.Join(prefix, "1", channelBalanced) - waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) + watchPrefix := path.Join(prefix, "1") + channelBalanced = waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) - key = path.Join(prefix, "2", channelBalanced) + key := path.Join(prefix, "2", channelBalanced) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.AddNode(3) chManager.Watch(ctx, &channelMeta{Name: "channel-4", CollectionID: collectionID}) - key = path.Join(prefix, "3", "channel-4") - waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) + // key = path.Join(prefix, "3", "channel-4") + watchPrefix = path.Join(prefix, "3") + channelBalanced2 := waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) - assert.True(t, chManager.Match(3, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else if channel == channelBalanced2 { + assert.True(t, chManager.Match(3, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.DeleteNode(3) - key = path.Join(prefix, "2", "channel-4") + key = path.Join(prefix, "2", channelBalanced2) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) - assert.True(t, chManager.Match(2, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else if channel == channelBalanced2 { + assert.True(t, chManager.Match(2, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.DeleteNode(2) - key = path.Join(prefix, "1", "channel-4") + key = path.Join(prefix, "1", channelBalanced) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - key = path.Join(prefix, "1", "channel-1") + key = path.Join(prefix, "1", channelBalanced2) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(1, "channel-1")) - assert.True(t, chManager.Match(1, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + assert.True(t, chManager.Match(1, channel)) + } }) } @@ -1157,12 +1177,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) { store: &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: { - NodeID: 1, - Channels: []RWChannel{ - &channelMeta{Name: "ch1", CollectionID: 1}, - }, - }, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "ch1", CollectionID: 1}), }, }, }, @@ -1257,14 +1272,14 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) { mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{ { NodeID: 1, - Channels: []RWChannel{ - &channelMeta{ + Channels: map[string]RWChannel{ + "channel-1": &channelMeta{ Name: "channel-1", }, - &channelMeta{ + "channel-2": &channelMeta{ Name: "channel-2", }, - &channelMeta{ + "channel-3": &channelMeta{ Name: "channel-3", }, }, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 7a94c4df0a..ab458fc04b 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -220,7 +219,31 @@ type ChannelStore struct { // NodeChannelInfo stores the nodeID and its channels. type NodeChannelInfo struct { NodeID int64 - Channels []RWChannel + Channels map[string]RWChannel + // ChannelsSet typeutil.Set[string] // map for fast channel check +} + +// AddChannel appends channel info node channel list. +func (info *NodeChannelInfo) AddChannel(ch RWChannel) { + info.Channels[ch.GetName()] = ch +} + +// RemoveChannel removes channel from Channels. +func (info *NodeChannelInfo) RemoveChannel(channelName string) { + delete(info.Channels, channelName) +} + +func NewNodeChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo { + info := &NodeChannelInfo{ + NodeID: nodeID, + Channels: make(map[string]RWChannel), + } + + for _, channel := range channels { + info.Channels[channel.GetName()] = channel + } + + return info } // NewChannelStore creates and returns a new ChannelStore. @@ -231,7 +254,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore { } c.channelsInfo[bufferID] = &NodeChannelInfo{ NodeID: bufferID, - Channels: make([]RWChannel, 0), + Channels: make(map[string]RWChannel), } return c } @@ -264,7 +287,8 @@ func (c *ChannelStore) Reload() error { Schema: cw.GetSchema(), WatchInfo: cw, } - c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel) + c.channelsInfo[nodeID].AddChannel(channel) + log.Info("channel store reload channel", zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) @@ -280,10 +304,7 @@ func (c *ChannelStore) Add(nodeID int64) { return } - c.channelsInfo[nodeID] = &NodeChannelInfo{ - NodeID: nodeID, - Channels: make([]RWChannel, 0), - } + c.channelsInfo[nodeID] = NewNodeChannelInfo(nodeID) } // Update applies the channel operations in opSet. @@ -317,11 +338,9 @@ func (c *ChannelStore) Update(opSet *ChannelOpSet) error { } func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool { - if _, ok := c.channelsInfo[nodeID]; ok { - for _, ch := range c.channelsInfo[nodeID].Channels { - if channel.GetName() == ch.GetName() && channel.GetCollectionID() == ch.GetCollectionID() { - return true - } + if info, ok := c.channelsInfo[nodeID]; ok { + if ch, ok := info.Channels[channel.GetName()]; ok { + return ch.GetCollectionID() == channel.GetCollectionID() } } return false @@ -343,19 +362,13 @@ func (c *ChannelStore) update(opSet *ChannelOpSet) error { continue // prevent adding duplicated channel info } // Append target channels to channel store. - c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, ch) + c.channelsInfo[op.NodeID].AddChannel(ch) } case Delete: - del := typeutil.NewSet(op.GetChannelNames()...) - - prev := c.channelsInfo[op.NodeID].Channels - curr := make([]RWChannel, 0, len(prev)) - for _, ch := range prev { - if !del.Contain(ch.GetName()) { - curr = append(curr, ch) - } + info := c.channelsInfo[op.NodeID] + for _, channelName := range op.GetChannelNames() { + info.RemoveChannel(channelName) } - c.channelsInfo[op.NodeID].Channels = curr default: return errUnknownOpType } @@ -421,7 +434,7 @@ func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) { return nil, err } delete(c.channelsInfo, id) - return info.Channels, nil + return lo.Values(info.Channels), nil } } return nil, nil diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index 49cc758515..47413961fb 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -40,10 +40,7 @@ func genNodeChannelInfos(id int64, num int) *NodeChannelInfo { name := fmt.Sprintf("ch%d", i) channels = append(channels, &channelMeta{Name: name, CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}}) } - return &NodeChannelInfo{ - NodeID: id, - Channels: channels, - } + return NewNodeChannelInfo(id, channels...) } func genChannelOperations(from, to int64, num int) *ChannelOpSet { @@ -85,7 +82,7 @@ func TestChannelStore_Update(t *testing.T) { txnKv, map[int64]*NodeChannelInfo{ 1: genNodeChannelInfos(1, 500), - 2: {NodeID: 2}, + 2: NewNodeChannelInfo(2), }, }, args{ diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 80f2f38b08..fc943c9503 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -47,8 +47,8 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet } opSet := NewChannelOpSet( - NewDeleteOp(bufferID, info.Channels...), - NewAddOp(nodeID, info.Channels...)) + NewDeleteOp(bufferID, lo.Values(info.Channels)...), + NewAddOp(nodeID, lo.Values(info.Channels)...)) return opSet } @@ -89,7 +89,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (*ChannelOpSet, // TODO: Consider re-picking in case assignment is extremely uneven? continue } - releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx]) + releases[toRelease.NodeID] = append(releases[toRelease.NodeID], lo.Values(toRelease.Channels)[chIdx]) } // Channels in `releases` are reassigned eventually by channel manager. @@ -197,8 +197,8 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) *ChannelO for _, c := range allNodes { if c.NodeID == nodeID { - opSet.Delete(nodeID, c.Channels...) - unregisteredChannels = append(unregisteredChannels, c.Channels...) + opSet.Delete(nodeID, lo.Values(c.Channels)...) + unregisteredChannels = append(unregisteredChannels, lo.Values(c.Channels)...) continue } avaNodes = append(avaNodes, c) @@ -236,7 +236,7 @@ func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) *ChannelOpSet { return opSet } for _, reAlloc := range reAllocates { - opSet.Add(reAlloc.NodeID, reAlloc.Channels...) + opSet.Add(reAlloc.NodeID, lo.Values(reAlloc.Channels)...) } return opSet @@ -295,7 +295,7 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) * // reassign channels to remaining nodes addUpdates := make(map[int64]*ChannelOp) for _, reassign := range reassigns { - opSet.Delete(reassign.NodeID, reassign.Channels...) + opSet.Delete(reassign.NodeID, lo.Values(reassign.Channels)...) for _, ch := range reassign.Channels { nodeIdx := 0 for { @@ -381,13 +381,10 @@ func BgBalanceCheck(nodeChannels []*NodeChannelInfo, ts time.Time) ([]*NodeChann zap.Int("channelCountPerNode", channelCountPerNode)) continue } - reallocate := &NodeChannelInfo{ - NodeID: nChannels.NodeID, - Channels: make([]RWChannel, 0), - } + reallocate := NewNodeChannelInfo(nChannels.NodeID) toReleaseCount := chCount - channelCountPerNode - 1 for _, ch := range nChannels.Channels { - reallocate.Channels = append(reallocate.Channels, ch) + reallocate.AddChannel(ch) toReleaseCount-- if toReleaseCount <= 0 { break diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 41b5d44f31..2ccbdfa6eb 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -33,8 +33,8 @@ func TestBufferChannelAssignPolicy(t *testing.T) { store := &ChannelStore{ store: kv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{}}, - bufferID: {bufferID, channels}, + 1: NewNodeChannelInfo(1), + bufferID: NewNodeChannelInfo(bufferID, channels...), }, } @@ -86,7 +86,7 @@ func TestAverageAssignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, []RWChannel{getChannel("chan1", 1)}, @@ -99,8 +99,8 @@ func TestAverageAssignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - 2: {2, []RWChannel{getChannel("chan3", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan", 1), getChannel("chan2", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan3", 1)), }, }, []RWChannel{getChannel("chan4", 1)}, @@ -132,7 +132,7 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, 1, @@ -148,9 +148,9 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, - 2: {2, []RWChannel{getChannel("chan2", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan2", 1)), + 3: NewNodeChannelInfo(3), }, }, 2, @@ -176,53 +176,50 @@ func TestBgCheckForChannelBalance(t *testing.T) { } tests := []struct { - name string - args args - want []*NodeChannelInfo + name string + args args + // want []*NodeChannelInfo + want int wantErr error }{ { "test even distribution", args{ []*NodeChannelInfo{ - {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - {2, []RWChannel{getChannel("chan1", 2), getChannel("chan2", 2)}}, - {3, []RWChannel{getChannel("chan1", 3), getChannel("chan2", 3)}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + NewNodeChannelInfo(2, getChannel("chan1", 2), getChannel("chan2", 2)), + NewNodeChannelInfo(3, getChannel("chan1", 3), getChannel("chan2", 3)), }, time.Now(), }, // there should be no reallocate - []*NodeChannelInfo{}, + 0, nil, }, { "test uneven with conservative effect", args{ []*NodeChannelInfo{ - {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - {2, []RWChannel{}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + NewNodeChannelInfo(2), }, time.Now(), }, // as we deem that the node having only one channel more than average as even, so there's no reallocation // for this test case - []*NodeChannelInfo{}, + 0, nil, }, { "test uneven with zero", args{ []*NodeChannelInfo{ - {1, []RWChannel{ - getChannel("chan1", 1), - getChannel("chan2", 1), - getChannel("chan3", 1), - }}, - {2, []RWChannel{}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1)), + NewNodeChannelInfo(2), }, time.Now(), }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + 1, nil, }, } @@ -231,7 +228,7 @@ func TestBgCheckForChannelBalance(t *testing.T) { policy := BgBalanceCheck got, err := policy(tt.args.channels, tt.args.timestamp) assert.Equal(t, tt.wantErr, err) - assert.EqualValues(t, tt.want, got) + assert.EqualValues(t, tt.want, len(got)) }) } } @@ -252,10 +249,10 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))}, }, // as there's no available nodes except the input node, there's no reassign plan generated NewChannelOpSet(), @@ -266,13 +263,13 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, - 2: {2, []RWChannel{}}, - 3: {2, []RWChannel{}}, - 4: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))}, }, // as we use ceil to calculate the wanted average number, there should be one reassign // though the average num less than 1 @@ -287,11 +284,11 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - 2: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + 2: NewNodeChannelInfo(2), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1))}, }, NewChannelOpSet( NewDeleteOp(1, getChannel("chan1", 1), getChannel("chan2", 1)), @@ -304,22 +301,19 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - getChannel("chan1", 1), + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), - getChannel("chan4", 1), - }}, - 2: {2, []RWChannel{}}, - 3: {3, []RWChannel{}}, - 4: {4, []RWChannel{}}, + getChannel("chan4", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{ - getChannel("chan1", 1), + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), - }}}, + getChannel("chan4", 1))}, }, NewChannelOpSet( NewDeleteOp(1, []RWChannel{ @@ -338,7 +332,7 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), @@ -350,17 +344,15 @@ func TestAvgReassignPolicy(t *testing.T) { getChannel("chan9", 1), getChannel("chan10", 1), getChannel("chan11", 1), - getChannel("chan12", 1), - }}, - 2: {2, []RWChannel{ + getChannel("chan12", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan13", 1), - getChannel("chan14", 1), - }}, - 3: {3, []RWChannel{getChannel("chan15", 1)}}, - 4: {4, []RWChannel{}}, + getChannel("chan14", 1)), + 3: NewNodeChannelInfo(3, getChannel("chan15", 1)), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{ + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), @@ -372,8 +364,7 @@ func TestAvgReassignPolicy(t *testing.T) { getChannel("chan9", 1), getChannel("chan10", 1), getChannel("chan11", 1), - getChannel("chan12", 1), - }}}, + getChannel("chan12", 1))}, }, NewChannelOpSet( NewDeleteOp(1, []RWChannel{ @@ -430,7 +421,7 @@ func TestAvgBalanceChannelPolicy(t *testing.T) { tests := []struct { name string args args - want *ChannelOpSet + want int }{ { "test_only_one_node", @@ -438,26 +429,24 @@ func TestAvgBalanceChannelPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: { - 1, []RWChannel{ - getChannel("chan1", 1), - getChannel("chan2", 1), - getChannel("chan3", 1), - getChannel("chan4", 1), - }, - }, - 2: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, + getChannel("chan1", 1), + getChannel("chan2", 1), + getChannel("chan3", 1), + getChannel("chan4", 1), + ), + 2: NewNodeChannelInfo(2), }, }, }, - NewChannelOpSet(NewAddOp(1, getChannel("chan1", 1))), + 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := AvgBalanceChannelPolicy(tt.args.store, time.Now()) - assert.EqualValues(t, tt.want.Collect(), got.Collect()) + assert.EqualValues(t, tt.want, len(got.Collect())) }) } } @@ -468,10 +457,13 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { nodeID int64 } tests := []struct { - name string - args args - bufferedUpdates *ChannelOpSet - balanceUpdates *ChannelOpSet + name string + args args + bufferedUpdates *ChannelOpSet + balanceUpdates *ChannelOpSet + exact bool + bufferedUpdatesNum int + balanceUpdatesNum int }{ { "test empty", @@ -479,13 +471,16 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: make([]RWChannel, 0)}, + 1: NewNodeChannelInfo(1), }, }, 1, }, NewChannelOpSet(), NewChannelOpSet(), + true, + 0, + 0, }, { "test with buffer channel", @@ -493,8 +488,8 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - bufferID: {bufferID, []RWChannel{getChannel("ch1", 1)}}, - 1: {NodeID: 1, Channels: []RWChannel{}}, + bufferID: NewNodeChannelInfo(bufferID, getChannel("ch1", 1)), + 1: NewNodeChannelInfo(1), }, }, 1, @@ -504,6 +499,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { NewAddOp(1, getChannel("ch1", 1)), ), NewChannelOpSet(), + true, + 0, + 0, }, { "test with avg assign", @@ -511,14 +509,17 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1)), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))), + false, + 0, + 1, }, { "test with avg equals to zero", @@ -526,38 +527,49 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1)}}, - 2: {2, []RWChannel{getChannel("ch3", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1)), + 2: NewNodeChannelInfo(2, getChannel("ch3", 1)), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(), + true, + 0, + 0, }, { - "test node with empty channel", + "test_node_with_empty_channel", args{ &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)}}, - 2: {2, []RWChannel{}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))), + false, + 0, + 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID) - assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect()) - assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect()) + if tt.exact { + assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect()) + assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect()) + } else { + assert.Equal(t, tt.bufferedUpdatesNum, len(bufferedUpdates.Collect())) + assert.Equal(t, tt.balanceUpdatesNum, len(balanceUpdates.Collect())) + } }) } }