Add unit tests in dataservice (#5478)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-05-28 15:13:51 +08:00 committed by zhenshan.cao
parent 7a8b34ee9f
commit 43534ef63f
7 changed files with 187 additions and 47 deletions

View File

@ -194,11 +194,11 @@ func (s *Server) prepareSegmentPos(segInfo *datapb.SegmentInfo, dmlPos, ddlPos *
return nil, err
}
msPosPair := proto.MarshalTextString(ddlPos)
result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair //segment pos
result[path.Join(Params.SegmentDdlPosSubPath, key)] = msPosPair //segment pos
result[path.Join(Params.DdlChannelPosSubPath, segInfo.InsertChannel)] = msPosPair // DdlChannel pos(use dm channel as Key, since dd channel may share same channel name)
}
return map[string]string{}, nil
return result, nil
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
@ -245,7 +245,7 @@ func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelPair, e
pairs = append(pairs, &datapb.VchannelPair{
CollectionID: vchan.CollectionID,
DmlVchannelName: vchan.DmlChannel,
DdlVchannelName: vchan.DmlChannel,
DdlVchannelName: vchan.DdlChannel,
DdlPosition: ddlPos,
DmlPosition: dmlPos,
})

View File

@ -62,7 +62,7 @@ func withAssignPolicy(p channelAssignPolicy) clusterOption {
}
func defaultStartupPolicy() clusterStartupPolicy {
return newReWatchOnRestartsStartupPolicy()
return newWatchRestartsStartupPolicy()
}
func defaultRegisterPolicy() dataNodeRegisterPolicy {
@ -74,7 +74,7 @@ func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
}
func defaultAssignPolicy() channelAssignPolicy {
return newAllAssignPolicy()
return newAssignAllPolicy()
}
func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster {
@ -165,6 +165,7 @@ func (c *cluster) register(n *datapb.DataNodeInfo) {
func (c *cluster) unregister(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.sessionManager.releaseSession(n.Address)
c.dataManager.unregister(n)
cNodes := c.dataManager.getDataNodes(true)
rets := c.unregisterPolicy.apply(cNodes, n)

View File

@ -12,9 +12,7 @@
package dataservice
import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
type vchannel struct {
@ -57,30 +55,3 @@ func (dp dummyPosProvider) GetDdlChannel() string {
func (s *Server) GetDdlChannel() string {
return s.ddChannelName
}
// getAllActiveVChannels get all vchannels with unflushed segments
func (s *Server) getAllActiveVChannels() []vchannel {
segments := s.meta.GetUnFlushedSegments()
mChanCol := make(map[string]UniqueID)
for _, segment := range segments {
ocid, has := mChanCol[segment.InsertChannel]
if has && ocid != segment.CollectionID {
log.Error("col:vchan not 1:N",
zap.Int64("colid 1", ocid),
zap.Int64("colid 2", segment.CollectionID),
zap.String("channel", segment.InsertChannel))
}
mChanCol[segment.InsertChannel] = segment.CollectionID
}
vchans := make([]vchannel, 0, len(mChanCol))
for dmChan, colID := range mChanCol {
vchans = append(vchans, vchannel{
CollectionID: colID,
DmlChannel: dmChan,
DdlChannel: s.ddChannelName,
})
}
return vchans
}

View File

@ -25,17 +25,18 @@ type clusterDeltaChange struct {
restarts []string
}
type clusterStartupPolicy interface {
// apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed
apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo
}
type reWatchOnRestartsStartupPolicy struct {
type watchRestartsStartupPolicy struct {
}
func newReWatchOnRestartsStartupPolicy() clusterStartupPolicy {
return &reWatchOnRestartsStartupPolicy{}
func newWatchRestartsStartupPolicy() clusterStartupPolicy {
return &watchRestartsStartupPolicy{}
}
func (p *reWatchOnRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, addr := range delta.restarts {
node := cluster[addr]
@ -48,6 +49,7 @@ func (p *reWatchOnRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNo
}
type dataNodeRegisterPolicy interface {
// apply accept all online nodes and new created node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
@ -63,6 +65,7 @@ func (p *doNothingRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo,
}
type dataNodeUnregisterPolicy interface {
// apply accept all online nodes and unregistered node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
@ -77,9 +80,9 @@ func (p *doNothingUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInf
return nil
}
type reassignRandomUnregisterPolicy struct{}
type randomAssignUnregisterPolicy struct{}
func (p *reassignRandomUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
func (p *randomAssignUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
if len(cluster) == 0 || // no available node
len(session.Channels) == 0 { // lost node not watching any channels
return []*datapb.DataNodeInfo{}
@ -113,17 +116,18 @@ func (p *reassignRandomUnregisterPolicy) apply(cluster map[string]*datapb.DataNo
}
type channelAssignPolicy interface {
// apply accept all online nodes and new created channel with collectionID, returns node needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
}
type allAssignPolicy struct {
type assignAllPolicy struct {
}
func newAllAssignPolicy() channelAssignPolicy {
return &allAssignPolicy{}
func newAssignAllPolicy() channelAssignPolicy {
return &assignAllPolicy{}
}
func (p *allAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
func (p *assignAllPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, node := range cluster {
has := false

View File

@ -0,0 +1,46 @@
package dataservice
import (
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
func TestWatchRestartsPolicy(t *testing.T) {
p := newWatchRestartsStartupPolicy()
c := make(map[string]*datapb.DataNodeInfo)
c["localhost:1111"] = &datapb.DataNodeInfo{
Address: "localhost:1111",
Version: 0,
Channels: []*datapb.ChannelStatus{
{
Name: "vch1",
State: datapb.ChannelWatchState_Complete,
CollectionID: 0,
},
},
}
c["localhost:2222"] = &datapb.DataNodeInfo{
Address: "localhost:2222",
Version: 0,
Channels: []*datapb.ChannelStatus{
{
Name: "vch2",
State: datapb.ChannelWatchState_Complete,
CollectionID: 0,
},
},
}
dchange := &clusterDeltaChange{
newNodes: []string{},
offlines: []string{},
restarts: []string{"localhost:2222"},
}
nodes := p.apply(c, dchange)
assert.EqualValues(t, 1, len(nodes))
assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State)
}

View File

@ -560,7 +560,6 @@ func (s *Server) prepareBinlogAndPos(req *datapb.SaveBinlogPathsRequest) (map[st
log.Error("Failed to get segment info", zap.Int64("segmentID", req.GetSegmentID()), zap.Error(err))
return nil, err
}
log.Debug("segment", zap.Int64("segment", segInfo.CollectionID))
for _, fieldBlp := range req.Field2BinlogPaths {
fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)

View File

@ -494,8 +494,11 @@ func TestSaveBinlogPaths(t *testing.T) {
CollectionID: 0,
Field2BinlogPaths: []*datapb.ID2PathList{
{
ID: 1,
Paths: []string{"/by-dev/test/0/1/2/1/Allo1", "/by-dev/test/0/1/2/1/Allo2"},
ID: 1,
Paths: []string{
"/by-dev/test/0/1/2/1/Allo1",
"/by-dev/test/0/1/2/1/Allo2",
},
},
},
DdlBinlogPaths: []*datapb.DDLBinlogMeta{
@ -508,6 +511,34 @@ func TestSaveBinlogPaths(t *testing.T) {
TsBinlogPath: "/by-dev/test/0/ts/Allo8",
},
},
DmlPosition: &datapb.PositionPair{
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
EndPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{3, 4, 5},
MsgGroup: "",
Timestamp: 0,
},
},
DdlPosition: &datapb.PositionPair{
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch2",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
EndPosition: &internalpb.MsgPosition{
ChannelName: "ch2",
MsgID: []byte{3, 4, 5},
MsgGroup: "",
Timestamp: 0,
},
},
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
@ -767,6 +798,94 @@ func TestResumeChannel(t *testing.T) {
})
}
func TestGetVChannelPos(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
err := svr.meta.AddCollection(&datapb.CollectionInfo{
ID: 0,
Schema: schema,
})
assert.Nil(t, err)
err = svr.meta.AddSegment(&datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
})
assert.Nil(t, err)
req := &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.ID2PathList{},
DdlBinlogPaths: []*datapb.DDLBinlogMeta{},
DmlPosition: &datapb.PositionPair{
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
EndPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{3, 4, 5},
MsgGroup: "",
Timestamp: 0,
},
},
DdlPosition: &datapb.PositionPair{
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch2",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
EndPosition: &internalpb.MsgPosition{
ChannelName: "ch2",
MsgID: []byte{3, 4, 5},
MsgGroup: "",
Timestamp: 0,
},
},
}
status, err := svr.SaveBinlogPaths(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, status.ErrorCode)
t.Run("get unexisted channel", func(t *testing.T) {
pair, err := svr.GetVChanPositions([]vchannel{
{
CollectionID: 0,
DmlChannel: "chx1",
DdlChannel: "chx2",
},
})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(pair))
assert.Nil(t, pair[0].DmlPosition.StartPosition.MsgID)
assert.Nil(t, pair[0].DmlPosition.EndPosition.MsgID)
assert.Nil(t, pair[0].DdlPosition.StartPosition.MsgID)
assert.Nil(t, pair[0].DdlPosition.EndPosition.MsgID)
})
t.Run("get existed channel", func(t *testing.T) {
pair, err := svr.GetVChanPositions([]vchannel{
{
CollectionID: 0,
DmlChannel: "ch1",
DdlChannel: "ch2",
},
})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(pair))
assert.EqualValues(t, 0, pair[0].CollectionID)
assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DmlPosition.StartPosition.MsgID)
assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DmlPosition.EndPosition.MsgID)
assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DdlPosition.StartPosition.MsgID)
assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DdlPosition.EndPosition.MsgID)
})
}
func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
Params.Init()
var err error