From 83da08c3889717617ef92363b8aae66c03d0467c Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 16 Apr 2024 15:57:19 +0800 Subject: [PATCH] enhance: Use map instead of slice to maintain channel info (#32273) See also #32165 `ChannelManager.Match` is a frequent operation for datacoord. When the collection number is large, iteration over all channels will cost lots of CPU time and time consuming. This PR change the data structure storing datanode-channel info to map avoiding this iteration when checking channel existence. --------- Signed-off-by: Congqi Xia --- internal/datacoord/channel_manager.go | 27 ++- internal/datacoord/channel_manager_test.go | 161 ++++++++++-------- internal/datacoord/channel_store.go | 61 ++++--- internal/datacoord/channel_store_test.go | 7 +- internal/datacoord/policy.go | 21 +-- internal/datacoord/policy_test.go | 188 +++++++++++---------- 6 files changed, 249 insertions(+), 216 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 047f593907..d224053ea4 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -508,13 +508,12 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo { func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { nodeChs := make(map[UniqueID][]string) for _, nodeChannels := range c.GetAssignedChannels() { - filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { - return channel.GetCollectionID() == collectionID - }) - channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string { - return channel.GetName() - }) - + var channelNames []string + for name, ch := range nodeChannels.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } nodeChs[nodeChannels.NodeID] = channelNames } return nodeChs @@ -524,11 +523,11 @@ func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel { channels := make([]RWChannel, 0) for _, nodeChannels := range c.GetAssignedChannels() { - filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { - return channel.GetCollectionID() == collectionID - }) - - channels = append(channels, filtered...) + for _, ch := range nodeChannels.Channels { + if ch.GetCollectionID() == collectionID { + channels = append(channels, ch) + } + } } return channels } @@ -807,7 +806,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string) } c.mu.RUnlock() - reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}} + reallocates := NewNodeChannelInfo(originNodeID, ch) isDropped := c.isMarkedDrop(channelName) c.mu.Lock() @@ -862,7 +861,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } - reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}} + reallocates := NewNodeChannelInfo(nodeID, chToCleanUp) isDropped := c.isMarkedDrop(channelName) c.mu.Lock() diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 536f65b1a1..7e890b55d3 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -56,6 +56,31 @@ func waitAndStore(t *testing.T, watchkv kv.MetaKv, key string, waitState, storeS } } +func waitPrefixAndStore(t *testing.T, watchkv kv.MetaKv, prefix string, waitState, storeState datapb.ChannelWatchState) string { + channelName := "" + for { + keys, values, err := watchkv.LoadWithPrefix(prefix) + if err == nil && len(values) > 0 { + for idx, value := range values { + watchInfo, err := parseWatchInfo(keys[idx], []byte(value)) + require.NoError(t, err) + require.Equal(t, waitState, watchInfo.GetState()) + + channelName = watchInfo.GetVchan().GetChannelName() + + watchInfo.State = storeState + data, err := proto.Marshal(watchInfo) + require.NoError(t, err) + + watchkv.Save(path.Join(prefix, watchInfo.GetVchan().GetChannelName()), string(data)) + } + break + } + time.Sleep(100 * time.Millisecond) + } + return channelName +} + // waitAndCheckState checks if the DataCoord writes expected state into Etcd func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { for { @@ -217,10 +242,8 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, - oldNode: {oldNode, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), + oldNode: NewNodeChannelInfo(oldNode), }, } @@ -260,9 +283,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), }, } @@ -306,10 +327,8 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, - oldNode: {oldNode, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), + oldNode: NewNodeChannelInfo(oldNode), }, } @@ -352,9 +371,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: cName, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}), }, } @@ -400,10 +417,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{ - &channelMeta{Name: channel1, CollectionID: collectionID}, - &channelMeta{Name: channel2, CollectionID: collectionID}, - }}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}), }, } @@ -438,10 +452,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - bufferID: {bufferID, []RWChannel{ - &channelMeta{Name: channel1, CollectionID: collectionID}, - &channelMeta{Name: channel2, CollectionID: collectionID}, - }}, + bufferID: NewNodeChannelInfo(bufferID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}), }, } @@ -502,7 +513,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -682,11 +693,9 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - &channelMeta{Name: "channel-1", CollectionID: collectionID}, - &channelMeta{Name: "channel-2", CollectionID: collectionID}, - }}, - bufferID: {bufferID, []RWChannel{}}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID}, + &channelMeta{Name: "channel-2", CollectionID: collectionID}), + bufferID: NewNodeChannelInfo(bufferID), }, } chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)) @@ -774,7 +783,7 @@ func TestChannelManager(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } ch = chManager.getChannelByNodeAndName(nodeID, channelName) @@ -943,7 +952,7 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -966,7 +975,7 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), }, } @@ -993,8 +1002,8 @@ func TestChannelManager_Reload(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}}, - 999: {999, []RWChannel{}}, + nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}), + 999: NewNodeChannelInfo(999), }, } require.NoError(t, err) @@ -1024,8 +1033,8 @@ func TestChannelManager_Reload(t *testing.T) { cm.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{&channelMeta{Name: "channel1", CollectionID: 1}}}, - 2: {2, []RWChannel{&channelMeta{Name: "channel2", CollectionID: 1}}}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel1", CollectionID: 1}), + 2: NewNodeChannelInfo(2, &channelMeta{Name: "channel2", CollectionID: 1}), }, } @@ -1077,58 +1086,69 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { chManager.store = &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - &channelMeta{Name: "channel-1", CollectionID: collectionID}, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID}, &channelMeta{Name: "channel-2", CollectionID: collectionID}, - &channelMeta{Name: "channel-3", CollectionID: collectionID}, - }}, + &channelMeta{Name: "channel-3", CollectionID: collectionID}), }, } var channelBalanced string chManager.AddNode(2) - channelBalanced = "channel-1" - key := path.Join(prefix, "1", channelBalanced) - waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) + watchPrefix := path.Join(prefix, "1") + channelBalanced = waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) - key = path.Join(prefix, "2", channelBalanced) + key := path.Join(prefix, "2", channelBalanced) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.AddNode(3) chManager.Watch(ctx, &channelMeta{Name: "channel-4", CollectionID: collectionID}) - key = path.Join(prefix, "3", "channel-4") - waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) + // key = path.Join(prefix, "3", "channel-4") + watchPrefix = path.Join(prefix, "3") + channelBalanced2 := waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) - assert.True(t, chManager.Match(3, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else if channel == channelBalanced2 { + assert.True(t, chManager.Match(3, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.DeleteNode(3) - key = path.Join(prefix, "2", "channel-4") + key = path.Join(prefix, "2", channelBalanced2) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) - assert.True(t, chManager.Match(2, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + if channel == channelBalanced { + assert.True(t, chManager.Match(2, channel)) + } else if channel == channelBalanced2 { + assert.True(t, chManager.Match(2, channel)) + } else { + assert.True(t, chManager.Match(1, channel)) + } + } chManager.DeleteNode(2) - key = path.Join(prefix, "1", "channel-4") + key = path.Join(prefix, "1", channelBalanced) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - key = path.Join(prefix, "1", "channel-1") + key = path.Join(prefix, "1", channelBalanced2) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) - assert.True(t, chManager.Match(1, "channel-2")) - assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(1, "channel-1")) - assert.True(t, chManager.Match(1, "channel-4")) + for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} { + assert.True(t, chManager.Match(1, channel)) + } }) } @@ -1157,12 +1177,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) { store: &ChannelStore{ store: watchkv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: { - NodeID: 1, - Channels: []RWChannel{ - &channelMeta{Name: "ch1", CollectionID: 1}, - }, - }, + 1: NewNodeChannelInfo(1, &channelMeta{Name: "ch1", CollectionID: 1}), }, }, }, @@ -1257,14 +1272,14 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) { mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{ { NodeID: 1, - Channels: []RWChannel{ - &channelMeta{ + Channels: map[string]RWChannel{ + "channel-1": &channelMeta{ Name: "channel-1", }, - &channelMeta{ + "channel-2": &channelMeta{ Name: "channel-2", }, - &channelMeta{ + "channel-3": &channelMeta{ Name: "channel-3", }, }, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 7a94c4df0a..ab458fc04b 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -220,7 +219,31 @@ type ChannelStore struct { // NodeChannelInfo stores the nodeID and its channels. type NodeChannelInfo struct { NodeID int64 - Channels []RWChannel + Channels map[string]RWChannel + // ChannelsSet typeutil.Set[string] // map for fast channel check +} + +// AddChannel appends channel info node channel list. +func (info *NodeChannelInfo) AddChannel(ch RWChannel) { + info.Channels[ch.GetName()] = ch +} + +// RemoveChannel removes channel from Channels. +func (info *NodeChannelInfo) RemoveChannel(channelName string) { + delete(info.Channels, channelName) +} + +func NewNodeChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo { + info := &NodeChannelInfo{ + NodeID: nodeID, + Channels: make(map[string]RWChannel), + } + + for _, channel := range channels { + info.Channels[channel.GetName()] = channel + } + + return info } // NewChannelStore creates and returns a new ChannelStore. @@ -231,7 +254,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore { } c.channelsInfo[bufferID] = &NodeChannelInfo{ NodeID: bufferID, - Channels: make([]RWChannel, 0), + Channels: make(map[string]RWChannel), } return c } @@ -264,7 +287,8 @@ func (c *ChannelStore) Reload() error { Schema: cw.GetSchema(), WatchInfo: cw, } - c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel) + c.channelsInfo[nodeID].AddChannel(channel) + log.Info("channel store reload channel", zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) @@ -280,10 +304,7 @@ func (c *ChannelStore) Add(nodeID int64) { return } - c.channelsInfo[nodeID] = &NodeChannelInfo{ - NodeID: nodeID, - Channels: make([]RWChannel, 0), - } + c.channelsInfo[nodeID] = NewNodeChannelInfo(nodeID) } // Update applies the channel operations in opSet. @@ -317,11 +338,9 @@ func (c *ChannelStore) Update(opSet *ChannelOpSet) error { } func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool { - if _, ok := c.channelsInfo[nodeID]; ok { - for _, ch := range c.channelsInfo[nodeID].Channels { - if channel.GetName() == ch.GetName() && channel.GetCollectionID() == ch.GetCollectionID() { - return true - } + if info, ok := c.channelsInfo[nodeID]; ok { + if ch, ok := info.Channels[channel.GetName()]; ok { + return ch.GetCollectionID() == channel.GetCollectionID() } } return false @@ -343,19 +362,13 @@ func (c *ChannelStore) update(opSet *ChannelOpSet) error { continue // prevent adding duplicated channel info } // Append target channels to channel store. - c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, ch) + c.channelsInfo[op.NodeID].AddChannel(ch) } case Delete: - del := typeutil.NewSet(op.GetChannelNames()...) - - prev := c.channelsInfo[op.NodeID].Channels - curr := make([]RWChannel, 0, len(prev)) - for _, ch := range prev { - if !del.Contain(ch.GetName()) { - curr = append(curr, ch) - } + info := c.channelsInfo[op.NodeID] + for _, channelName := range op.GetChannelNames() { + info.RemoveChannel(channelName) } - c.channelsInfo[op.NodeID].Channels = curr default: return errUnknownOpType } @@ -421,7 +434,7 @@ func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) { return nil, err } delete(c.channelsInfo, id) - return info.Channels, nil + return lo.Values(info.Channels), nil } } return nil, nil diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index 49cc758515..47413961fb 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -40,10 +40,7 @@ func genNodeChannelInfos(id int64, num int) *NodeChannelInfo { name := fmt.Sprintf("ch%d", i) channels = append(channels, &channelMeta{Name: name, CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}}) } - return &NodeChannelInfo{ - NodeID: id, - Channels: channels, - } + return NewNodeChannelInfo(id, channels...) } func genChannelOperations(from, to int64, num int) *ChannelOpSet { @@ -85,7 +82,7 @@ func TestChannelStore_Update(t *testing.T) { txnKv, map[int64]*NodeChannelInfo{ 1: genNodeChannelInfos(1, 500), - 2: {NodeID: 2}, + 2: NewNodeChannelInfo(2), }, }, args{ diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 80f2f38b08..fc943c9503 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -47,8 +47,8 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet } opSet := NewChannelOpSet( - NewDeleteOp(bufferID, info.Channels...), - NewAddOp(nodeID, info.Channels...)) + NewDeleteOp(bufferID, lo.Values(info.Channels)...), + NewAddOp(nodeID, lo.Values(info.Channels)...)) return opSet } @@ -89,7 +89,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (*ChannelOpSet, // TODO: Consider re-picking in case assignment is extremely uneven? continue } - releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx]) + releases[toRelease.NodeID] = append(releases[toRelease.NodeID], lo.Values(toRelease.Channels)[chIdx]) } // Channels in `releases` are reassigned eventually by channel manager. @@ -197,8 +197,8 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) *ChannelO for _, c := range allNodes { if c.NodeID == nodeID { - opSet.Delete(nodeID, c.Channels...) - unregisteredChannels = append(unregisteredChannels, c.Channels...) + opSet.Delete(nodeID, lo.Values(c.Channels)...) + unregisteredChannels = append(unregisteredChannels, lo.Values(c.Channels)...) continue } avaNodes = append(avaNodes, c) @@ -236,7 +236,7 @@ func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) *ChannelOpSet { return opSet } for _, reAlloc := range reAllocates { - opSet.Add(reAlloc.NodeID, reAlloc.Channels...) + opSet.Add(reAlloc.NodeID, lo.Values(reAlloc.Channels)...) } return opSet @@ -295,7 +295,7 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) * // reassign channels to remaining nodes addUpdates := make(map[int64]*ChannelOp) for _, reassign := range reassigns { - opSet.Delete(reassign.NodeID, reassign.Channels...) + opSet.Delete(reassign.NodeID, lo.Values(reassign.Channels)...) for _, ch := range reassign.Channels { nodeIdx := 0 for { @@ -381,13 +381,10 @@ func BgBalanceCheck(nodeChannels []*NodeChannelInfo, ts time.Time) ([]*NodeChann zap.Int("channelCountPerNode", channelCountPerNode)) continue } - reallocate := &NodeChannelInfo{ - NodeID: nChannels.NodeID, - Channels: make([]RWChannel, 0), - } + reallocate := NewNodeChannelInfo(nChannels.NodeID) toReleaseCount := chCount - channelCountPerNode - 1 for _, ch := range nChannels.Channels { - reallocate.Channels = append(reallocate.Channels, ch) + reallocate.AddChannel(ch) toReleaseCount-- if toReleaseCount <= 0 { break diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 41b5d44f31..2ccbdfa6eb 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -33,8 +33,8 @@ func TestBufferChannelAssignPolicy(t *testing.T) { store := &ChannelStore{ store: kv, channelsInfo: map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{}}, - bufferID: {bufferID, channels}, + 1: NewNodeChannelInfo(1), + bufferID: NewNodeChannelInfo(bufferID, channels...), }, } @@ -86,7 +86,7 @@ func TestAverageAssignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, []RWChannel{getChannel("chan1", 1)}, @@ -99,8 +99,8 @@ func TestAverageAssignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - 2: {2, []RWChannel{getChannel("chan3", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan", 1), getChannel("chan2", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan3", 1)), }, }, []RWChannel{getChannel("chan4", 1)}, @@ -132,7 +132,7 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, 1, @@ -148,9 +148,9 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, - 2: {2, []RWChannel{getChannel("chan2", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan2", 1)), + 3: NewNodeChannelInfo(3), }, }, 2, @@ -176,53 +176,50 @@ func TestBgCheckForChannelBalance(t *testing.T) { } tests := []struct { - name string - args args - want []*NodeChannelInfo + name string + args args + // want []*NodeChannelInfo + want int wantErr error }{ { "test even distribution", args{ []*NodeChannelInfo{ - {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - {2, []RWChannel{getChannel("chan1", 2), getChannel("chan2", 2)}}, - {3, []RWChannel{getChannel("chan1", 3), getChannel("chan2", 3)}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + NewNodeChannelInfo(2, getChannel("chan1", 2), getChannel("chan2", 2)), + NewNodeChannelInfo(3, getChannel("chan1", 3), getChannel("chan2", 3)), }, time.Now(), }, // there should be no reallocate - []*NodeChannelInfo{}, + 0, nil, }, { "test uneven with conservative effect", args{ []*NodeChannelInfo{ - {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - {2, []RWChannel{}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + NewNodeChannelInfo(2), }, time.Now(), }, // as we deem that the node having only one channel more than average as even, so there's no reallocation // for this test case - []*NodeChannelInfo{}, + 0, nil, }, { "test uneven with zero", args{ []*NodeChannelInfo{ - {1, []RWChannel{ - getChannel("chan1", 1), - getChannel("chan2", 1), - getChannel("chan3", 1), - }}, - {2, []RWChannel{}}, + NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1)), + NewNodeChannelInfo(2), }, time.Now(), }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + 1, nil, }, } @@ -231,7 +228,7 @@ func TestBgCheckForChannelBalance(t *testing.T) { policy := BgBalanceCheck got, err := policy(tt.args.channels, tt.args.timestamp) assert.Equal(t, tt.wantErr, err) - assert.EqualValues(t, tt.want, got) + assert.EqualValues(t, tt.want, len(got)) }) } } @@ -252,10 +249,10 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))}, }, // as there's no available nodes except the input node, there's no reassign plan generated NewChannelOpSet(), @@ -266,13 +263,13 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1)}}, - 2: {2, []RWChannel{}}, - 3: {2, []RWChannel{}}, - 4: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))}, }, // as we use ceil to calculate the wanted average number, there should be one reassign // though the average num less than 1 @@ -287,11 +284,11 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}, - 2: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)), + 2: NewNodeChannelInfo(2), }, }, - []*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}}, + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1))}, }, NewChannelOpSet( NewDeleteOp(1, getChannel("chan1", 1), getChannel("chan2", 1)), @@ -304,22 +301,19 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ - getChannel("chan1", 1), + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), - getChannel("chan4", 1), - }}, - 2: {2, []RWChannel{}}, - 3: {3, []RWChannel{}}, - 4: {4, []RWChannel{}}, + getChannel("chan4", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{ - getChannel("chan1", 1), + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), - }}}, + getChannel("chan4", 1))}, }, NewChannelOpSet( NewDeleteOp(1, []RWChannel{ @@ -338,7 +332,7 @@ func TestAvgReassignPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{ + 1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), @@ -350,17 +344,15 @@ func TestAvgReassignPolicy(t *testing.T) { getChannel("chan9", 1), getChannel("chan10", 1), getChannel("chan11", 1), - getChannel("chan12", 1), - }}, - 2: {2, []RWChannel{ + getChannel("chan12", 1)), + 2: NewNodeChannelInfo(2, getChannel("chan13", 1), - getChannel("chan14", 1), - }}, - 3: {3, []RWChannel{getChannel("chan15", 1)}}, - 4: {4, []RWChannel{}}, + getChannel("chan14", 1)), + 3: NewNodeChannelInfo(3, getChannel("chan15", 1)), + 4: NewNodeChannelInfo(4), }, }, - []*NodeChannelInfo{{1, []RWChannel{ + []*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1), @@ -372,8 +364,7 @@ func TestAvgReassignPolicy(t *testing.T) { getChannel("chan9", 1), getChannel("chan10", 1), getChannel("chan11", 1), - getChannel("chan12", 1), - }}}, + getChannel("chan12", 1))}, }, NewChannelOpSet( NewDeleteOp(1, []RWChannel{ @@ -430,7 +421,7 @@ func TestAvgBalanceChannelPolicy(t *testing.T) { tests := []struct { name string args args - want *ChannelOpSet + want int }{ { "test_only_one_node", @@ -438,26 +429,24 @@ func TestAvgBalanceChannelPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: { - 1, []RWChannel{ - getChannel("chan1", 1), - getChannel("chan2", 1), - getChannel("chan3", 1), - getChannel("chan4", 1), - }, - }, - 2: {2, []RWChannel{}}, + 1: NewNodeChannelInfo(1, + getChannel("chan1", 1), + getChannel("chan2", 1), + getChannel("chan3", 1), + getChannel("chan4", 1), + ), + 2: NewNodeChannelInfo(2), }, }, }, - NewChannelOpSet(NewAddOp(1, getChannel("chan1", 1))), + 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := AvgBalanceChannelPolicy(tt.args.store, time.Now()) - assert.EqualValues(t, tt.want.Collect(), got.Collect()) + assert.EqualValues(t, tt.want, len(got.Collect())) }) } } @@ -468,10 +457,13 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { nodeID int64 } tests := []struct { - name string - args args - bufferedUpdates *ChannelOpSet - balanceUpdates *ChannelOpSet + name string + args args + bufferedUpdates *ChannelOpSet + balanceUpdates *ChannelOpSet + exact bool + bufferedUpdatesNum int + balanceUpdatesNum int }{ { "test empty", @@ -479,13 +471,16 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: make([]RWChannel, 0)}, + 1: NewNodeChannelInfo(1), }, }, 1, }, NewChannelOpSet(), NewChannelOpSet(), + true, + 0, + 0, }, { "test with buffer channel", @@ -493,8 +488,8 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - bufferID: {bufferID, []RWChannel{getChannel("ch1", 1)}}, - 1: {NodeID: 1, Channels: []RWChannel{}}, + bufferID: NewNodeChannelInfo(bufferID, getChannel("ch1", 1)), + 1: NewNodeChannelInfo(1), }, }, 1, @@ -504,6 +499,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { NewAddOp(1, getChannel("ch1", 1)), ), NewChannelOpSet(), + true, + 0, + 0, }, { "test with avg assign", @@ -511,14 +509,17 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1)), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))), + false, + 0, + 1, }, { "test with avg equals to zero", @@ -526,38 +527,49 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1)}}, - 2: {2, []RWChannel{getChannel("ch3", 1)}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1)), + 2: NewNodeChannelInfo(2, getChannel("ch3", 1)), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(), + true, + 0, + 0, }, { - "test node with empty channel", + "test_node_with_empty_channel", args{ &ChannelStore{ memkv.NewMemoryKV(), map[int64]*NodeChannelInfo{ - 1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)}}, - 2: {2, []RWChannel{}}, - 3: {3, []RWChannel{}}, + 1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)), + 2: NewNodeChannelInfo(2), + 3: NewNodeChannelInfo(3), }, }, 3, }, NewChannelOpSet(), NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))), + false, + 0, + 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID) - assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect()) - assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect()) + if tt.exact { + assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect()) + assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect()) + } else { + assert.Equal(t, tt.bufferedUpdatesNum, len(bufferedUpdates.Collect())) + assert.Equal(t, tt.balanceUpdatesNum, len(balanceUpdates.Collect())) + } }) } }