From a3679d5540e8b9d53faf2ff5ea0ad23be4a380cd Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 9 Jun 2021 18:43:50 +0800 Subject: [PATCH] Add vchannel buffer for cluster (#5691) * Add channel buffer for cluster Signed-off-by: Congqi Xia * Change default register policy Signed-off-by: Congqi Xia --- internal/dataservice/cluster.go | 55 ++++++++++------ internal/dataservice/cluster_data_manager.go | 45 +++++++++---- internal/dataservice/cluster_test.go | 12 ++-- internal/dataservice/mock_test.go | 4 +- internal/dataservice/policy.go | 66 ++++++++++++++++++-- internal/dataservice/policy_test.go | 2 +- 6 files changed, 140 insertions(+), 44 deletions(-) diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index fa0ec00795..9334dd28a8 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -67,7 +67,7 @@ func defaultStartupPolicy() clusterStartupPolicy { } func defaultRegisterPolicy() dataNodeRegisterPolicy { - return newEmptyRegisterPolicy() + return newAssiggBufferRegisterPolicy() } func defaultUnregisterPolicy() dataNodeUnregisterPolicy { @@ -98,11 +98,12 @@ func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionMan func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { deltaChange := c.dataManager.updateCluster(dataNodes) - nodes := c.dataManager.getDataNodes(false) - rets := c.startupPolicy.apply(nodes, deltaChange) - c.dataManager.updateDataNodes(rets) + nodes, chanBuffer := c.dataManager.getDataNodes(false) + var rets []*datapb.DataNodeInfo + rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) + c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) - c.dataManager.updateDataNodes(rets) + c.dataManager.updateDataNodes(rets, chanBuffer) return nil } @@ -167,11 +168,12 @@ func (c *cluster) register(n *datapb.DataNodeInfo) { c.mu.Lock() defer c.mu.Unlock() c.dataManager.register(n) - cNodes := c.dataManager.getDataNodes(true) - rets := c.registerPolicy.apply(cNodes, n) - c.dataManager.updateDataNodes(rets) + cNodes, chanBuffer := c.dataManager.getDataNodes(true) + var rets []*datapb.DataNodeInfo + rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer) + c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) - c.dataManager.updateDataNodes(rets) + c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) unregister(n *datapb.DataNodeInfo) { @@ -179,21 +181,38 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) { defer c.mu.Unlock() c.sessionManager.releaseSession(n.Address) c.dataManager.unregister(n) - cNodes := c.dataManager.getDataNodes(true) - rets := c.unregisterPolicy.apply(cNodes, n) - c.dataManager.updateDataNodes(rets) + cNodes, chanBuffer := c.dataManager.getDataNodes(true) + var rets []*datapb.DataNodeInfo + if len(cNodes) == 0 { + for _, chStat := range n.Channels { + chStat.State = datapb.ChannelWatchState_Uncomplete + chanBuffer = append(chanBuffer, chStat) + } + } else { + rets = c.unregisterPolicy.apply(cNodes, n) + } + c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) - c.dataManager.updateDataNodes(rets) + c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) { c.mu.Lock() defer c.mu.Unlock() - cNodes := c.dataManager.getDataNodes(true) - rets := c.assignPolicy.apply(cNodes, channel, collectionID) - c.dataManager.updateDataNodes(rets) + cNodes, chanBuffer := c.dataManager.getDataNodes(true) + var rets []*datapb.DataNodeInfo + if len(cNodes) == 0 { // no nodes to assign, put into buffer + chanBuffer = append(chanBuffer, &datapb.ChannelStatus{ + Name: channel, + CollectionID: collectionID, + State: datapb.ChannelWatchState_Uncomplete, + }) + } else { + rets = c.assignPolicy.apply(cNodes, channel, collectionID) + } + c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) - c.dataManager.updateDataNodes(rets) + c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) flush(segments []*datapb.SegmentInfo) { @@ -210,7 +229,7 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) { m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID) } - dataNodes := c.dataManager.getDataNodes(true) + dataNodes, _ := c.dataManager.getDataNodes(true) channel2Node := make(map[string]string) for _, node := range dataNodes { diff --git a/internal/dataservice/cluster_data_manager.go b/internal/dataservice/cluster_data_manager.go index e29621cc08..8e34caef65 100644 --- a/internal/dataservice/cluster_data_manager.go +++ b/internal/dataservice/cluster_data_manager.go @@ -18,6 +18,7 @@ import ( ) const clusterPrefix = "cluster-prefix/" +const clusterBuffer = "cluster-buffer" type dataNodeStatus int8 @@ -32,14 +33,16 @@ type dataNodeInfo struct { } type clusterNodeManager struct { - kv kv.TxnKV - dataNodes map[string]*dataNodeInfo + kv kv.TxnKV + dataNodes map[string]*dataNodeInfo + chanBuffer []*datapb.ChannelStatus //Unwatched channels buffer } func newClusterNodeManager(kv kv.TxnKV) (*clusterNodeManager, error) { c := &clusterNodeManager{ - kv: kv, - dataNodes: make(map[string]*dataNodeInfo), + kv: kv, + dataNodes: make(map[string]*dataNodeInfo), + chanBuffer: []*datapb.ChannelStatus{}, } return c, c.loadFromKv() } @@ -62,6 +65,15 @@ func (c *clusterNodeManager) loadFromKv() error { } c.dataNodes[info.Address] = node } + dn, _ := c.kv.Load(clusterBuffer) + //TODO add not value error check + if dn != "" { + info := &datapb.DataNodeInfo{} + if err := proto.UnmarshalText(dn, info); err != nil { + return err + } + c.chanBuffer = info.Channels + } return nil } @@ -71,7 +83,9 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl offlines := make([]string, 0) restarts := make([]string, 0) var onCnt, offCnt float64 + currentOnline := make(map[string]struct{}) for _, n := range dataNodes { + currentOnline[n.Address] = struct{}{} onCnt++ node, ok := c.dataNodes[n.Address] @@ -96,7 +110,9 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl } for nAddr, node := range c.dataNodes { - if node.status == offline { + _, has := currentOnline[nAddr] + if !has && node.status == online { + node.status = offline offCnt++ offlines = append(offlines, nAddr) } @@ -110,22 +126,22 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl } } -func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo) error { +func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error { for _, node := range dataNodes { c.dataNodes[node.Address].info = node } - return c.txnSaveNodes(dataNodes) + return c.txnSaveNodes(dataNodes, buffer) } -func (c *clusterNodeManager) getDataNodes(onlyOnline bool) map[string]*datapb.DataNodeInfo { +func (c *clusterNodeManager) getDataNodes(onlyOnline bool) (map[string]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { ret := make(map[string]*datapb.DataNodeInfo) for k, v := range c.dataNodes { if !onlyOnline || v.status == online { ret[k] = proto.Clone(v.info).(*datapb.DataNodeInfo) } } - return ret + return ret, c.chanBuffer } func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) { @@ -164,8 +180,8 @@ func (c *clusterNodeManager) updateMetrics() { metrics.DataServiceDataNodeList.WithLabelValues("offline").Set(offCnt) } -func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo) error { - if len(nodes) == 0 { +func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error { + if len(nodes) == 0 && len(buffer) == 0 { return nil } data := make(map[string]string) @@ -175,5 +191,12 @@ func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo) error { value := proto.MarshalTextString(n) data[key] = value } + c.chanBuffer = buffer + + // short cut, reusing datainfo to store array of channel status + bufNode := &datapb.DataNodeInfo{ + Channels: buffer, + } + data[clusterBuffer] = proto.MarshalTextString(bufNode) return c.kv.MultiSave(data) } diff --git a/internal/dataservice/cluster_test.go b/internal/dataservice/cluster_test.go index 2112bf9741..17e6a1fa1f 100644 --- a/internal/dataservice/cluster_test.go +++ b/internal/dataservice/cluster_test.go @@ -32,7 +32,7 @@ func TestClusterCreate(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) - dataNodes := cluster.dataManager.getDataNodes(true) + dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) } @@ -50,7 +50,7 @@ func TestRegister(t *testing.T) { Version: 1, Channels: []*datapb.ChannelStatus{}, }) - dataNodes := cluster.dataManager.getDataNodes(true) + dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) } @@ -69,7 +69,7 @@ func TestUnregister(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) - dataNodes := cluster.dataManager.getDataNodes(true) + dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) cluster.unregister(&datapb.DataNodeInfo{ @@ -77,7 +77,7 @@ func TestUnregister(t *testing.T) { Version: 1, Channels: []*datapb.ChannelStatus{}, }) - dataNodes = cluster.dataManager.getDataNodes(false) + dataNodes, _ = cluster.dataManager.getDataNodes(false) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, offline, cluster.dataManager.dataNodes[addr].status) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) @@ -96,13 +96,13 @@ func TestWatchIfNeeded(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) - dataNodes := cluster.dataManager.getDataNodes(true) + dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) chName := "ch1" cluster.watchIfNeeded(chName, 0) - dataNodes = cluster.dataManager.getDataNodes(true) + dataNodes, _ = cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes[addr].Channels)) assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name) cluster.watchIfNeeded(chName, 0) diff --git a/internal/dataservice/mock_test.go b/internal/dataservice/mock_test.go index c2b712235f..4d7d8b72ab 100644 --- a/internal/dataservice/mock_test.go +++ b/internal/dataservice/mock_test.go @@ -298,8 +298,8 @@ func newMockStartupPolicy() clusterStartupPolicy { return &mockStartupPolicy{} } -func (p *mockStartupPolicy) apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo { - return nil +func (p *mockStartupPolicy) apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { + return nil, nil } type mockSessionManager struct { diff --git a/internal/dataservice/policy.go b/internal/dataservice/policy.go index bf241bc2c0..9cf8893473 100644 --- a/internal/dataservice/policy.go +++ b/internal/dataservice/policy.go @@ -27,7 +27,7 @@ type clusterDeltaChange struct { } type clusterStartupPolicy interface { // apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed - apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo + apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) } type watchRestartsStartupPolicy struct { @@ -37,7 +37,8 @@ func newWatchRestartsStartupPolicy() clusterStartupPolicy { return &watchRestartsStartupPolicy{} } -func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo { +func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { ret := make([]*datapb.DataNodeInfo, 0) for _, addr := range delta.restarts { node := cluster[addr] @@ -46,12 +47,52 @@ func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeIn } ret = append(ret, node) } - return ret + // put all channels from offline into buffer first + for _, addr := range delta.offlines { + node := cluster[addr] + for _, ch := range node.Channels { + ch.State = datapb.ChannelWatchState_Uncomplete + buffer = append(buffer, ch) + } + } + // try new nodes first + if len(delta.newNodes) > 0 && len(buffer) > 0 { + idx := 0 + for len(buffer) > 0 { + node := cluster[delta.newNodes[idx%len(delta.newNodes)]] + node.Channels = append(node.Channels, buffer[0]) + buffer = buffer[1:] + if idx < len(delta.newNodes) { + ret = append(ret, node) + } + idx++ + } + } + // try online nodes if buffer is not empty + if len(buffer) > 0 { + online := make([]*datapb.DataNodeInfo, 0, len(cluster)) + for _, node := range cluster { + online = append(online, node) + } + if len(online) > 0 { + idx := 0 + for len(buffer) > 0 { + node := online[idx%len(online)] + node.Channels = append(node.Channels, buffer[0]) + buffer = buffer[1:] + if idx < len(online) { + ret = append(ret, node) + } + idx++ + } + } + } + return ret, buffer } type dataNodeRegisterPolicy interface { // apply accept all online nodes and new created node, returns nodes needed to be changed - apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo + apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) } type emptyRegisterPolicy struct { @@ -61,8 +102,21 @@ func newEmptyRegisterPolicy() dataNodeRegisterPolicy { return &emptyRegisterPolicy{} } -func (p *emptyRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { - return []*datapb.DataNodeInfo{session} +func (p *emptyRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { + return []*datapb.DataNodeInfo{session}, buffer +} + +type assignBufferRegisterPolicy struct{} + +func newAssiggBufferRegisterPolicy() dataNodeRegisterPolicy { + return &assignBufferRegisterPolicy{} +} + +func (p *assignBufferRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { + session.Channels = append(session.Channels, buffer...) + return []*datapb.DataNodeInfo{session}, []*datapb.ChannelStatus{} } type dataNodeUnregisterPolicy interface { diff --git a/internal/dataservice/policy_test.go b/internal/dataservice/policy_test.go index 61dfbc586d..3758515aaa 100644 --- a/internal/dataservice/policy_test.go +++ b/internal/dataservice/policy_test.go @@ -51,7 +51,7 @@ func TestWatchRestartsPolicy(t *testing.T) { restarts: []string{"localhost:2222"}, } - nodes := p.apply(c, dchange) + nodes, _ := p.apply(c, dchange, []*datapb.ChannelStatus{}) assert.EqualValues(t, 1, len(nodes)) assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State) }