From 42f643e727ea3c98274eee16850380348c0532bd Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 2 Jun 2022 13:56:04 +0800 Subject: [PATCH] Make DataNode release rather than delete when reassign (#17293) 1. Reassgin now will assign to the original Node if no other nodes avaliable 2. Make AddNode balance async: ToRealse + Reassign See also: #16114, #17270 Signed-off-by: yangxuan --- internal/datacoord/channel_manager.go | 138 +++++---- internal/datacoord/channel_manager_test.go | 331 ++++++++++++++++----- internal/datacoord/policy.go | 23 +- internal/datacoord/policy_test.go | 21 +- 4 files changed, 345 insertions(+), 168 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 5708348f3b..7b474d887e 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -172,12 +172,17 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { // ReleaseSuccess remove // ReleaseFail clean up and remove func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error { + // Load all the watch infos before processing + nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo) for _, nodeID := range nodes { watchInfos, err := c.stateTimer.loadAllChannels(nodeID) if err != nil { return err } + nodeWatchInfos[nodeID] = watchInfos + } + for nodeID, watchInfos := range nodeWatchInfos { for _, info := range watchInfos { channelName := info.GetVchan().GetChannelName() @@ -198,12 +203,12 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error { c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs()) case datapb.ChannelWatchState_ReleaseSuccess: - if err := c.toDelete(nodeID, channelName); err != nil { + if err := c.Reassign(nodeID, channelName); err != nil { return err } case datapb.ChannelWatchState_ReleaseFailure: - if err := c.cleanUpAndDelete(nodeID, channelName); err != nil { + if err := c.CleanupAndReassign(nodeID, channelName); err != nil { return err } } @@ -318,9 +323,9 @@ func (c *ChannelManager) AddNode(nodeID int64) error { c.store.Add(nodeID) - // the default registerPolicy doesn't reassgin channels already there updates := c.registerPolicy(c.store, nodeID) if len(updates) <= 0 { + log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) return nil } @@ -328,7 +333,7 @@ func (c *ChannelManager) AddNode(nodeID int64) error { zap.Int64("registered node", nodeID), zap.Array("updates", updates)) - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToRelease) } // DeleteNode deletes the node from the cluster. @@ -348,10 +353,30 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { log.Warn("deregister node", zap.Int64("unregistered node", nodeID), zap.Array("updates", updates)) + if len(updates) <= 0 { + return nil + } + + var channels []*channel + for _, op := range updates { + if op.Type == Delete { + channels = op.Channels + } + } + + chNames := make([]string, 0, len(channels)) + for _, ch := range channels { + chNames = append(chNames, ch.Name) + } + log.Debug("remove timers for channel of the deregistered node", + zap.Any("channels", chNames), zap.Int64("nodeID", nodeID)) + c.stateTimer.removeTimers(chNames) if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil { return err } + + // No channels will be return _, err := c.store.Delete(nodeID) return err } @@ -579,14 +604,16 @@ func (c *ChannelManager) processAck(e *ackEvent) { } case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease - err := c.cleanUpAndDelete(e.nodeID, e.channelName) + // Cleanup, Delete and Reassign + err := c.CleanupAndReassign(e.nodeID, e.channelName) if err != nil { - log.Warn("fail to clean and delete channels for release failure ACKs", + log.Warn("fail to clean and reassign channels for release failure ACKs", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err)) } case releaseSuccessAck: - err := c.toDelete(e.nodeID, e.channelName) + // Delete and Reassign + err := c.Reassign(e.nodeID, e.channelName) if err != nil { log.Warn("fail to response to release success ACK", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err)) @@ -594,8 +621,8 @@ func (c *ChannelManager) processAck(e *ackEvent) { } } -// cleanUpAndDelete tries to clean up datanode's subscription, and then delete channel watch info. -func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) error { +// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info. +func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -612,35 +639,31 @@ func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) e msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } - if !c.isMarkedDrop(channelName) { - reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} - - // reassign policy won't choose the same Node for a ressignment of a channel - updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { - log.Warn("fail to reassign channel to other nodes, add channel to buffer", zap.String("channel name", channelName)) - updates.Add(bufferID, []*channel{chToCleanUp}) - } - - err := c.remove(nodeID, chToCleanUp) - if err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) - } - - log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) - - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) - } - err := c.remove(nodeID, chToCleanUp) if err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) } - log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) - c.h.FinishDropChannel(channelName) + if c.isMarkedDrop(channelName) { + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) + c.h.FinishDropChannel(channelName) + return nil + } - return nil + reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} + + // reassign policy won't choose the same Node for a ressignment of a channel + updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) + if len(updates) <= 0 { + log.Warn("fail to reassign channel to other nodes, add channel to the original node", + zap.Int64("nodeID", nodeID), + zap.String("channel name", channelName)) + updates.Add(nodeID, []*channel{chToCleanUp}) + } + + log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates)) + + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } type channelStateChecker func(context.Context) @@ -724,8 +747,8 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { return err } -// toDelete removes channel assignment from a datanode -func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error { +// Reassign removes channel assignment from a datanode +func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -734,36 +757,33 @@ func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error { return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName) } - if !c.isMarkedDrop(channelName) { - reallocates := &NodeChannelInfo{nodeID, []*channel{ch}} - - // reassign policy won't choose the same Node for a ressignment of a channel - updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { - log.Warn("fail to reassign channel to other nodes, add to the buffer", zap.String("channel name", channelName)) - updates.Add(bufferID, []*channel{ch}) - } - - err := c.remove(nodeID, ch) - if err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) - } - - log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) - - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) + if err := c.remove(nodeID, ch); err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) } - err := c.remove(nodeID, ch) - if err != nil { - return err + if c.isMarkedDrop(channelName) { + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) + c.h.FinishDropChannel(channelName) + + log.Info("removed channel assignment", zap.Any("channel", ch)) + return nil } - log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) - c.h.FinishDropChannel(channelName) + reallocates := &NodeChannelInfo{nodeID, []*channel{ch}} + + // reassign policy won't choose the same Node for a ressignment of a channel + updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) + if len(updates) <= 0 { + log.Warn("fail to reassign channel to other nodes, assign to the original Node", + zap.Int64("nodeID", nodeID), + zap.String("channel name", channelName)) + updates.Add(nodeID, []*channel{ch}) + } + + log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) + + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) - log.Info("removed channel assignment", zap.Any("channel", ch)) - return nil } func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel { diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 20f1bf6c68..cdc1e10f23 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -33,9 +33,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "stathat.com/c/consistent" ) +func waitForEctdDataReady(metakv kv.MetaKv, key string) { + for { + // make sure etcd has finished the operation + _, err := metakv.Load(key) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } +} + func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { prefix := Params.DataCoordCfg.ChannelWatchSubPath @@ -88,17 +98,6 @@ func TestChannelManager_StateTransfer(t *testing.T) { } } - makeSureEctdData := func(key string) { - for { - // make sure etcd has finished the operation - _, err := metakv.Load(key) - if err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - } - t.Run("toWatch-WatchSuccess", func(t *testing.T) { metakv.RemoveWithPrefix("") ctx, cancel := context.WithCancel(context.TODO()) @@ -119,7 +118,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() @@ -146,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) @@ -177,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.stateTimer.notifyTimeoutWatcher(e) chManager.stateTimer.stopIfExsit(e) - makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) cancel() wg.Wait() checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) @@ -216,7 +215,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) cancel() wg.Wait() @@ -263,7 +262,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) require.NoError(t, err) - makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) cancel() wg.Wait() @@ -374,18 +373,16 @@ func TestChannelManager(t *testing.T) { checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) }) - t.Run("test toDelete", func(t *testing.T) { + t.Run("test Reassign", func(t *testing.T) { defer metakv.RemoveWithPrefix("") var collectionID = UniqueID(5) tests := []struct { - isvalid bool - nodeID UniqueID - chName string + nodeID UniqueID + chName string }{ - {true, UniqueID(125), "normal-chan"}, - {true, UniqueID(115), "to-delete-chan"}, - {false, UniqueID(9), "invalid-chan"}, + {UniqueID(125), "normal-chan"}, + {UniqueID(115), "to-delete-chan"}, } chManager, err := NewChannelManager(metakv, newMockHandler()) @@ -393,20 +390,18 @@ func TestChannelManager(t *testing.T) { // prepare tests for _, test := range tests { - if test.isvalid { - chManager.store.Add(test.nodeID) - ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) - err = chManager.store.Update(ops) - require.NoError(t, err) + chManager.store.Add(test.nodeID) + ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) + err = chManager.store.Update(ops) + require.NoError(t, err) - info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) - require.NoError(t, err) - require.NotNil(t, info) - } + info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) + require.NoError(t, err) + require.NotNil(t, info) } remainTest, reassignTest := tests[0], tests[1] - err = chManager.toDelete(reassignTest.nodeID, reassignTest.chName) + err = chManager.Reassign(reassignTest.nodeID, reassignTest.chName) assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) @@ -423,29 +418,54 @@ func TestChannelManager(t *testing.T) { assert.Equal(t, 2, len(nodeChanInfo.Channels)) assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) - // Delete node of reassginTest and try to toDelete node in remainTest + // Delete node of reassginTest and try to Reassign node in remainTest err = chManager.DeleteNode(reassignTest.nodeID) require.NoError(t, err) - err = chManager.toDelete(remainTest.nodeID, remainTest.chName) + err = chManager.Reassign(remainTest.nodeID, remainTest.chName) assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) - // channel is added to bufferID because there's only one node left - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID) + // channel is added to remainTest because there's only one node left + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) }) - t.Run("test cleanUpAndDelete", func(t *testing.T) { + + t.Run("test DeleteNode", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + + var ( + collectionID = UniqueID(999) + ) + chManager, err := NewChannelManager(metakv, newMockHandler(), withStateChecker()) + require.NoError(t, err) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + 1: {1, []*channel{ + {"channel-1", collectionID}, + {"channel-2", collectionID}}}, + bufferID: {bufferID, []*channel{}}, + }, + } + chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, time.Now().Add(maxWatchDuration).UnixNano()) + + err = chManager.DeleteNode(1) + assert.NoError(t, err) + + chs := chManager.store.GetBufferChannelInfo() + assert.Equal(t, 2, len(chs.Channels)) + }) + + t.Run("test CleanupAndReassign", func(t *testing.T) { defer metakv.RemoveWithPrefix("") var collectionID = UniqueID(6) tests := []struct { - isvalid bool - nodeID UniqueID - chName string + nodeID UniqueID + chName string }{ - {true, UniqueID(126), "normal-chan"}, - {true, UniqueID(116), "to-delete-chan"}, - {false, UniqueID(9), "invalid-chan"}, + {UniqueID(126), "normal-chan"}, + {UniqueID(116), "to-delete-chan"}, } factory := dependency.NewDefaultFactory(true) @@ -457,20 +477,18 @@ func TestChannelManager(t *testing.T) { // prepare tests for _, test := range tests { - if test.isvalid { - chManager.store.Add(test.nodeID) - ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) - err = chManager.store.Update(ops) - require.NoError(t, err) + chManager.store.Add(test.nodeID) + ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) + err = chManager.store.Update(ops) + require.NoError(t, err) - info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) - require.NoError(t, err) - require.NotNil(t, info) - } + info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) + require.NoError(t, err) + require.NotNil(t, info) } remainTest, reassignTest := tests[0], tests[1] - err = chManager.cleanUpAndDelete(reassignTest.nodeID, reassignTest.chName) + err = chManager.CleanupAndReassign(reassignTest.nodeID, reassignTest.chName) assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) @@ -487,16 +505,16 @@ func TestChannelManager(t *testing.T) { assert.Equal(t, 2, len(nodeChanInfo.Channels)) assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) - // Delete node of reassginTest and try to cleanUpAndDelete node in remainTest + // Delete node of reassginTest and try to CleanupAndReassign node in remainTest err = chManager.DeleteNode(reassignTest.nodeID) require.NoError(t, err) - err = chManager.cleanUpAndDelete(remainTest.nodeID, remainTest.chName) + err = chManager.CleanupAndReassign(remainTest.nodeID, remainTest.chName) assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) - // channel is added to bufferID because there's only one node left - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID) + // channel is added to remainTest because there's only one node left + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) }) t.Run("test getChannelByNodeAndName", func(t *testing.T) { @@ -599,7 +617,7 @@ func TestChannelManager_Reload(t *testing.T) { ) prefix := Params.DataCoordCfg.ChannelWatchSubPath - getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { + getWatchInfoWithState := func(state datapb.ChannelWatchState, collectionID UniqueID, channelName string) *datapb.ChannelWatchInfo { return &datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ CollectionID: collectionID, @@ -615,7 +633,7 @@ func TestChannelManager_Reload(t *testing.T) { t.Run("ToWatch", func(t *testing.T) { defer metakv.RemoveWithPrefix("") - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch)) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch, collectionID, channelName)) require.NoError(t, err) chManager, err := NewChannelManager(metakv, newMockHandler()) require.NoError(t, err) @@ -630,7 +648,7 @@ func TestChannelManager_Reload(t *testing.T) { t.Run("ToRelease", func(t *testing.T) { defer metakv.RemoveWithPrefix("") - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease)) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease, collectionID, channelName)) require.NoError(t, err) chManager, err := NewChannelManager(metakv, newMockHandler()) require.NoError(t, err) @@ -654,7 +672,7 @@ func TestChannelManager_Reload(t *testing.T) { nodeID: {nodeID, []*channel{{channelName, collectionID}}}}, } - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure)) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure, collectionID, channelName)) require.NoError(t, err) err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) require.NoError(t, err) @@ -669,15 +687,15 @@ func TestChannelManager_Reload(t *testing.T) { defer metakv.RemoveWithPrefix("") chManager, err := NewChannelManager(metakv, newMockHandler()) require.NoError(t, err) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess, collectionID, channelName)) chManager.store = &ChannelStore{ store: metakv, channelsInfo: map[int64]*NodeChannelInfo{ nodeID: {nodeID, []*channel{{channelName, collectionID}}}}, } - chManager.AddNode(bufferID) require.NoError(t, err) + chManager.AddNode(UniqueID(111)) err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) require.NoError(t, err) err = chManager.checkOldNodes([]UniqueID{nodeID}) @@ -686,13 +704,16 @@ func TestChannelManager_Reload(t *testing.T) { v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) assert.Error(t, err) assert.Empty(t, v) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID) + chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID}) }) t.Run("ReleaseFail", func(t *testing.T) { defer metakv.RemoveWithPrefix("") chManager, err := NewChannelManager(metakv, newMockHandler()) require.NoError(t, err) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure, collectionID, channelName)) chManager.store = &ChannelStore{ store: metakv, channelsInfo: map[int64]*NodeChannelInfo{ @@ -703,11 +724,12 @@ func TestChannelManager_Reload(t *testing.T) { require.NoError(t, err) err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) require.NoError(t, err) - err = chManager.checkOldNodes([]UniqueID{nodeID, 999}) + err = chManager.checkOldNodes([]UniqueID{nodeID}) assert.NoError(t, err) - time.Sleep(time.Second) - v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(999, 10), channelName)) + + v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) assert.Error(t, err) assert.Empty(t, v) @@ -721,24 +743,153 @@ func TestChannelManager_Reload(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - hash := consistent.New() - cm, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash))) + cm, err := NewChannelManager(metakv, newMockHandler()) assert.Nil(t, err) assert.Nil(t, cm.AddNode(1)) assert.Nil(t, cm.AddNode(2)) - assert.Nil(t, cm.Watch(&channel{"channel1", 1})) - assert.Nil(t, cm.Watch(&channel{"channel2", 1})) + cm.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + 1: {1, []*channel{{"channel1", 1}}}, + 2: {2, []*channel{{"channel2", 1}}}, + }, + } - hash2 := consistent.New() - cm2, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2))) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess, 1, "channel1")) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(1, 10), "channel1"), string(data)) + require.NoError(t, err) + data, err = proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess, 1, "channel2")) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(2, 10), "channel2"), string(data)) + require.NoError(t, err) + + cm2, err := NewChannelManager(metakv, newMockHandler()) assert.Nil(t, err) - assert.Nil(t, cm2.Startup(ctx, []int64{1, 2})) - assert.Nil(t, cm2.AddNode(3)) + assert.Nil(t, cm2.Startup(ctx, []int64{3})) + + waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(3, 10), "channel2")) assert.True(t, cm2.Match(3, "channel1")) assert.True(t, cm2.Match(3, "channel2")) + + cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel1", 3}) + cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel2", 3}) }) } +func TestChannelManager_BalanceBehaviour(t *testing.T) { + metakv := getMetaKv(t) + defer func() { + metakv.RemoveWithPrefix("") + metakv.Close() + }() + + prefix := Params.DataCoordCfg.ChannelWatchSubPath + + t.Run("one node with two channels add a new node", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + + var ( + collectionID = UniqueID(999) + ) + + chManager, err := NewChannelManager(metakv, newMockHandler(), withStateChecker()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.TODO()) + chManager.stopChecker = cancel + defer cancel() + go chManager.stateChecker(ctx) + + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + 1: {1, []*channel{ + {"channel-1", collectionID}, + {"channel-2", collectionID}, + {"channel-3", collectionID}}}}, + } + + var ( + channelBalanced string + ) + + chManager.AddNode(2) + channelBalanced = "channel-1" + + waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { + for { + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) + v, err := metakv.Load(key) + if err == nil && len(v) > 0 { + watchInfo, err := parseWatchInfo(key, []byte(v)) + require.NoError(t, err) + require.Equal(t, waitState, watchInfo.GetState()) + + watchInfo.State = storeState + data, err := proto.Marshal(watchInfo) + require.NoError(t, err) + + metakv.Save(key, string(data)) + break + } + time.Sleep(100 * time.Millisecond) + } + } + + waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced) + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced) + + infos := chManager.store.GetNode(1) + assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) + assert.True(t, chManager.Match(1, "channel-3")) + + infos = chManager.store.GetNode(2) + assert.Equal(t, 1, len(infos.Channels)) + assert.True(t, chManager.Match(2, "channel-1")) + + chManager.AddNode(3) + chManager.Watch(&channel{"channel-4", collectionID}) + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4") + infos = chManager.store.GetNode(1) + assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) + assert.True(t, chManager.Match(1, "channel-3")) + + infos = chManager.store.GetNode(2) + assert.Equal(t, 1, len(infos.Channels)) + assert.True(t, chManager.Match(2, "channel-1")) + + infos = chManager.store.GetNode(3) + assert.Equal(t, 1, len(infos.Channels)) + assert.True(t, chManager.Match(3, "channel-4")) + + chManager.DeleteNode(3) + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4") + infos = chManager.store.GetNode(1) + assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) + assert.True(t, chManager.Match(1, "channel-3")) + + infos = chManager.store.GetNode(2) + assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(2, "channel-1")) + assert.True(t, chManager.Match(2, "channel-4")) + + chManager.DeleteNode(2) + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4") + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1") + infos = chManager.store.GetNode(1) + assert.Equal(t, 4, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) + assert.True(t, chManager.Match(1, "channel-3")) + assert.True(t, chManager.Match(1, "channel-1")) + assert.True(t, chManager.Match(1, "channel-4")) + }) + +} + func TestChannelManager_RemoveChannel(t *testing.T) { metakv := getMetaKv(t) defer func() { @@ -818,4 +969,30 @@ func TestChannelManager_HelperFunc(t *testing.T) { }) } }) + + t.Run("test getNewOnLines", func(t *testing.T) { + tests := []struct { + nodes []int64 + oNodes []int64 + + expectedOut []int64 + desription string + }{ + {[]int64{}, []int64{}, []int64{}, "empty both"}, + {[]int64{1}, []int64{}, []int64{1}, "empty oNodes"}, + {[]int64{}, []int64{1}, []int64{}, "empty nodes"}, + {[]int64{1}, []int64{1}, []int64{}, "same one"}, + {[]int64{1, 2}, []int64{1}, []int64{2}, "same one 2"}, + {[]int64{1}, []int64{1, 2}, []int64{}, "same one 3"}, + {[]int64{1, 2}, []int64{1, 2}, []int64{}, "same two"}, + } + + for _, test := range tests { + t.Run(test.desription, func(t *testing.T) { + nodes := c.getNewOnLines(test.nodes, test.oNodes) + assert.ElementsMatch(t, test.expectedOut, nodes) + }) + } + + }) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 6d93f2622a..1e7391bbd3 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -51,6 +51,7 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet } // AvgAssignRegisterPolicy assigns channels with average to new registered node +// Register will not directly delete the node-channel pair, channel manager handles the release itself func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { opSet := BufferChannelAssignPolicy(store, nodeID) if len(opSet) != 0 { @@ -74,24 +75,19 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { return len(infos[i].Channels) > len(infos[j].Channels) }) - deletes := make(map[int64][]*channel) - adds := make(map[int64][]*channel) + releases := make(map[int64][]*channel) for i := 0; i < avg; { t := infos[i%len(infos)] idx := i / len(infos) if idx >= len(t.Channels) { continue } - deletes[t.NodeID] = append(deletes[t.NodeID], t.Channels[idx]) - adds[nodeID] = append(adds[nodeID], t.Channels[idx]) + releases[t.NodeID] = append(releases[t.NodeID], t.Channels[idx]) i++ } opSet = ChannelOpSet{} - for k, v := range deletes { - opSet.Delete(k, v) - } - for k, v := range adds { + for k, v := range releases { opSet.Add(k, v) } return opSet @@ -114,8 +110,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic elems := formatNodeIDs(store.GetNodes()) hashRing.Set(elems) - removes := make(map[int64][]*channel) - adds := make(map[int64][]*channel) + releases := make(map[int64][]*channel) // If there are buffer channels, then nodeID is the first node. opSet := BufferChannelAssignPolicy(store, nodeID) @@ -141,16 +136,12 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic return nil } if did != c.NodeID { - removes[c.NodeID] = append(removes[c.NodeID], ch) - adds[did] = append(adds[did], ch) + releases[c.NodeID] = append(releases[c.NodeID], ch) } } } - for id, channels := range removes { - opSet.Delete(id, channels) - } - for id, channels := range adds { + for id, channels := range releases { opSet.Add(id, channels) } return opSet diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 02d514a8e8..507fcec879 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -97,11 +97,10 @@ func TestConsistentHashRegisterPolicy(t *testing.T) { updates := policy(store, 2) - // chan1 will be hash to 2, chan2 will be hash to 1 assert.NotNil(t, updates) - assert.Equal(t, 2, len(updates)) - assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: 1, Channels: []*channel{channels[0]}}, updates[0]) - assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 2, Channels: []*channel{channels[0]}}, updates[1]) + assert.Equal(t, 1, len(updates)) + // No Delete operation will be generated + assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 1, Channels: []*channel{channels[0]}}, updates[0]) }) } @@ -486,14 +485,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { 3, }, []*ChannelOp{ - { - Type: Delete, - NodeID: 1, - Channels: []*channel{{"ch1", 1}}, - }, { Type: Add, - NodeID: 3, + NodeID: 1, Channels: []*channel{{"ch1", 1}}, }, }, @@ -525,14 +519,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { 3, }, []*ChannelOp{ - { - Type: Delete, - NodeID: 1, - Channels: []*channel{{"ch1", 1}}, - }, { Type: Add, - NodeID: 3, + NodeID: 1, Channels: []*channel{{"ch1", 1}}, }, },