diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 3c8af986f4..cdc20d2baa 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -71,7 +71,7 @@ func defaultRegisterPolicy() dataNodeRegisterPolicy { } func defaultUnregisterPolicy() dataNodeUnregisterPolicy { - return &randomAssignUnregisterPolicy{} + return randomAssignRegisterFunc } func defaultAssignPolicy() channelAssignPolicy { diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index b27451e0f4..0a2571eaeb 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. + package datacoord import ( @@ -25,6 +26,8 @@ type clusterDeltaChange struct { offlines []string restarts []string } + +// clusterStartupPolicy defines the behavior when datacoord starts/restarts type clusterStartupPolicy interface { // apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) @@ -34,10 +37,20 @@ type watchRestartsStartupPolicy struct { } func newWatchRestartsStartupPolicy() clusterStartupPolicy { - return &watchRestartsStartupPolicy{} + return watchRestartStartup } -func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, +// startup func +type startupFunc func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) + +// implement watchRestartsStartupPolicy for startupFunc +func (f startupFunc) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { + return f(cluster, delta, buffer) +} + +var watchRestartStartup startupFunc = func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { ret := make([]*datapb.DataNodeInfo, 0) for _, addr := range delta.restarts { @@ -90,54 +103,64 @@ func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeIn return ret, buffer } +// dataNodeRegisterPolicy defines the behavior when a datanode is registered type dataNodeRegisterPolicy interface { // apply accept all online nodes and new created node, returns nodes needed to be changed apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) } -type emptyRegisterPolicy struct { +// data node register func, simple func wrapping policy +type dataNodeRegisterFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) + +// implement dataNodeRegisterPolicy for dataNodeRegisterFunc +func (f dataNodeRegisterFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, + buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { + return f(cluster, session, buffer) } -func newEmptyRegisterPolicy() dataNodeRegisterPolicy { - return &emptyRegisterPolicy{} -} - -func (p *emptyRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, +// test logic, register and do nothing +var emptyRegister dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { return []*datapb.DataNodeInfo{session}, buffer } -type assignBufferRegisterPolicy struct{} - -func newAssiggBufferRegisterPolicy() dataNodeRegisterPolicy { - return &assignBufferRegisterPolicy{} -} - -func (p *assignBufferRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, +// assign existing buffered channels into newly registered data node session +var registerAssignWithBuffer dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { session.Channels = append(session.Channels, buffer...) return []*datapb.DataNodeInfo{session}, []*datapb.ChannelStatus{} } +func newEmptyRegisterPolicy() dataNodeRegisterPolicy { + return emptyRegister +} + +func newAssiggBufferRegisterPolicy() dataNodeRegisterPolicy { + return registerAssignWithBuffer +} + +// dataNodeUnregisterPolicy defines the behavior when datanode unregisters type dataNodeUnregisterPolicy interface { // apply accept all online nodes and unregistered node, returns nodes needed to be changed apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo } -type emptyUnregisterPolicy struct { +// unregisterNodeFunc, short cut for functions implement policy +type unregisterNodeFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo + +// implement dataNodeUnregisterPolicy for unregisterNodeFunc +func (f unregisterNodeFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { + return f(cluster, session) } -func newEmptyUnregisterPolicy() dataNodeUnregisterPolicy { - return &emptyUnregisterPolicy{} -} - -func (p *emptyUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { +// test logic, do nothing when node unregister +var emptyUnregisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { return nil } -type randomAssignUnregisterPolicy struct{} - -func (p *randomAssignUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { +// randomly assign channels from unregistered node into existing nodes +// if there is no nodes online, this func will not be invoked, buffer will be filled outside this func +var randomAssignRegisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { if len(cluster) == 0 || // no available node session == nil || len(session.Channels) == 0 { // lost node not watching any channels @@ -178,19 +201,27 @@ func (p *randomAssignUnregisterPolicy) apply(cluster map[string]*datapb.DataNode return appliedNodes } +func newEmptyUnregisterPolicy() dataNodeUnregisterPolicy { + return emptyUnregisterFunc +} + +// channelAssignPolicy defines the behavior when a new channel needs to be assigned type channelAssignPolicy interface { // apply accept all online nodes and new created channel with collectionID, returns node needed to be changed apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo } -type assignAllPolicy struct { +// channelAssignFunc, function shortcut for policy +type channelAssignFunc func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo + +// implement channelAssignPolicy for channelAssign func +func (f channelAssignFunc) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo { + return f(cluster, channel, collectionID) } -func newAssignAllPolicy() channelAssignPolicy { - return &assignAllPolicy{} -} - -func (p *assignAllPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo { +// deprecated +// test logic, assign channel to all existing data node, works fine only when there is only one data node! +var assignAllFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo { ret := make([]*datapb.DataNodeInfo, 0) for _, node := range cluster { has := false @@ -214,13 +245,8 @@ func (p *assignAllPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel return ret } -type balancedAssignPolicy struct{} - -func newBalancedAssignPolicy() channelAssignPolicy { - return &balancedAssignPolicy{} -} - -func (p *balancedAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo { +// balanced assign channel, select the datanode with least amount of channels to assign +var balancedAssignFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo { if len(cluster) == 0 { return []*datapb.DataNodeInfo{} } @@ -249,3 +275,11 @@ func (p *balancedAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, ch ret = append(ret, cluster[target]) return ret } + +func newAssignAllPolicy() channelAssignPolicy { + return assignAllFunc +} + +func newBalancedAssignPolicy() channelAssignPolicy { + return balancedAssignFunc +} diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index fa0ad6c35a..546bccdf1d 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -57,7 +57,7 @@ func TestWatchRestartsPolicy(t *testing.T) { } func TestRandomReassign(t *testing.T) { - p := randomAssignUnregisterPolicy{} + p := randomAssignRegisterFunc clusters := make(map[string]*datapb.DataNodeInfo) clusters["addr1"] = &datapb.DataNodeInfo{