From 64aad4995954db3c29858aec77ecb39abff6efcf Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 9 Sep 2021 15:36:01 +0800 Subject: [PATCH] Add ut in datasync service to 88% (#7615) See also: #6357 Signed-off-by: yangxuan --- internal/datanode/data_sync_service.go | 14 +- internal/datanode/data_sync_service_test.go | 120 ++++++++++++++++++ internal/datanode/flow_graph_delete_node.go | 8 +- .../datanode/flow_graph_delete_node_test.go | 19 +-- .../flow_graph_dmstream_input_node_test.go | 14 +- internal/datanode/mock_test.go | 10 ++ 6 files changed, 160 insertions(+), 25 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 7cea406cb2..c4b3b130b5 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -13,6 +13,7 @@ package datanode import ( "context" + "errors" "fmt" "github.com/milvus-io/milvus/internal/log" @@ -36,6 +37,8 @@ type dataSyncService struct { collectionID UniqueID dataCoord types.DataCoord clearSignal chan<- UniqueID + + saveBinlog func(fu *segmentFlushUnit) error } func newDataSyncService(ctx context.Context, @@ -49,6 +52,10 @@ func newDataSyncService(ctx context.Context, ) (*dataSyncService, error) { + if replica == nil { + return nil, errors.New("Nil input") + } + ctx1, cancel := context.WithCancel(ctx) service := &dataSyncService{ @@ -147,6 +154,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return nil } + dsService.saveBinlog = saveBinlog + var dmStreamNode Node = newDmInputNode( dsService.ctx, dsService.msFactory, @@ -169,10 +178,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - dn, err := newDeleteDNode(dsService.replica) - if err != nil { - return err - } + dn := newDeleteDNode(dsService.replica) var deleteNode Node = dn diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 89c68df0e2..7cc5c37a2c 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -25,6 +25,126 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" ) +func getVchanInfo(cp bool, collID, ufCollID, ufSegID UniqueID, chanName, ufchanName string, ufNor int64) *datapb.VchannelInfo { + var ufs []*datapb.SegmentInfo + if cp { + ufs = []*datapb.SegmentInfo{{ + CollectionID: ufCollID, + PartitionID: 1, + InsertChannel: ufchanName, + ID: ufSegID, + NumOfRows: ufNor, + DmlPosition: &internalpb.MsgPosition{}, + }} + } else { + ufs = []*datapb.SegmentInfo{} + } + + vi := &datapb.VchannelInfo{ + CollectionID: collID, + ChannelName: chanName, + SeekPosition: &internalpb.MsgPosition{}, + UnflushedSegments: ufs, + FlushedSegments: []int64{}, + } + return vi +} + +func TestDataSyncService_newDataSyncService(te *testing.T) { + + ctx := context.Background() + + tests := []struct { + isValidCase bool + replicaNil bool + inMsgFactory msgstream.Factory + + collID UniqueID + ufCollID UniqueID + ufSegID UniqueID + chanName string + ufchanName string + ufNor int64 + + description string + }{ + {false, false, &mockMsgStreamFactory{false, true}, + 0, 0, 0, "", "", 0, + "SetParamsReturnError"}, + {true, false, &mockMsgStreamFactory{true, true}, + 0, 1, 0, "", "", 0, + "CollID 0 mismach with seginfo collID 1"}, + {true, false, &mockMsgStreamFactory{true, true}, + 1, 1, 0, "c1", "c2", 0, + "chanName c1 mismach with seginfo chanName c2"}, + {true, false, &mockMsgStreamFactory{true, true}, + 1, 1, 0, "c1", "c1", 0, + "add normal segments"}, + {false, false, &mockMsgStreamFactory{true, false}, + 0, 0, 0, "", "", 0, + "error when newinsertbufernode"}, + {false, true, &mockMsgStreamFactory{true, false}, + 0, 0, 0, "", "", 0, + "replica nil"}, + } + + for _, test := range tests { + te.Run(test.description, func(t *testing.T) { + df := &DataCoordFactory{} + + replica := newReplica(&RootCoordFactory{}, test.collID) + if test.replicaNil { + replica = nil + } + + ds, err := newDataSyncService(ctx, + make(chan *flushMsg), + replica, + NewAllocatorFactory(), + test.inMsgFactory, + getVchanInfo(test.isValidCase, test.collID, test.ufCollID, test.ufSegID, test.chanName, test.ufchanName, test.ufNor), + make(chan UniqueID), + df, + ) + + if !test.isValidCase { + assert.Error(t, err) + assert.Nil(t, ds) + } else { + assert.NoError(t, err) + assert.NotNil(t, ds) + + // save binlog + fu := &segmentFlushUnit{ + collID: 1, + segID: 100, + field2Path: map[UniqueID]string{100: "path1"}, + checkPoint: map[UniqueID]segmentCheckPoint{100: {100, internalpb.MsgPosition{}}}, + } + + df.SaveBinlogPathError = true + err := ds.saveBinlog(fu) + assert.Error(t, err) + + df.SaveBinlogPathError = false + df.SaveBinlogPathNotSucess = true + err = ds.saveBinlog(fu) + assert.Error(t, err) + + df.SaveBinlogPathError = false + df.SaveBinlogPathNotSucess = false + err = ds.saveBinlog(fu) + assert.NoError(t, err) + + // start + ds.fg = nil + ds.start() + } + }) + } + +} + // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { t.Skip() diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 0e32f54c7e..837c86f936 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -72,16 +72,12 @@ func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, erro return results, nil } -func newDeleteDNode(replica Replica) (*deleteNode, error) { +func newDeleteDNode(replica Replica) *deleteNode { baseNode := BaseNode{} baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) - if replica == nil { - return nil, errors.New("Nill input replica") - } - return &deleteNode{ BaseNode: baseNode, replica: replica, - }, nil + } } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 3c8fa237a3..0781c815de 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -23,27 +23,18 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { tests := []struct { replica Replica - expectedErr bool description string }{ - {nil, true, "Nil input"}, - {&SegmentReplica{}, false, "pointer of SegmentReplica"}, + {&SegmentReplica{}, "pointer of SegmentReplica"}, } for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn, err := newDeleteDNode(test.replica) + dn := newDeleteDNode(test.replica) - if test.expectedErr { - assert.Error(t, err) - assert.Nil(t, dn) - } else { - assert.NoError(t, err) - assert.NotNil(t, dn) - - assert.Equal(t, "deleteNode", dn.Name()) - dn.Close() - } + assert.NotNil(t, dn) + assert.Equal(t, "deleteNode", dn.Name()) + dn.Close() }) } diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index a947e3521a..3935b15b30 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -13,6 +13,7 @@ package datanode import ( "context" + "errors" "testing" "github.com/milvus-io/milvus/internal/msgstream" @@ -20,14 +21,25 @@ import ( ) type mockMsgStreamFactory struct { + SetParamsReturnNil bool + NewMsgStreamNoError bool } +var _ msgstream.Factory = &mockMsgStreamFactory{} + func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error { + if !mm.SetParamsReturnNil { + return errors.New("Set Params Error") + } + return nil } func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return nil, nil + if !mm.NewMsgStreamNoError { + return nil, errors.New("New MsgStream error") + } + return &mockTtMsgStream{}, nil } func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 9552373425..fff45ba14a 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -161,9 +161,19 @@ type RootCoordFactory struct { type DataCoordFactory struct { types.DataCoord + + SaveBinlogPathError bool + SaveBinlogPathNotSucess bool } func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { + if ds.SaveBinlogPathError { + return nil, errors.New("Error") + } + if ds.SaveBinlogPathNotSucess { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil + } + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil }