From 5a75e23795810dc5159b1a288b122fa626e14047 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Sat, 24 Apr 2021 11:29:15 +0800 Subject: [PATCH] improve data service code coverage (#5020) * optimize TestAssignSegmentID Signed-off-by: yudong.cai * update mockMasterService to improve code coverage Signed-off-by: yudong.cai * rename client to kvClient in Server Signed-off-by: yudong.cai * update server.go Signed-off-by: yudong.cai * add TestChannel Signed-off-by: yudong.cai * update TestChannel Signed-off-by: yudong.cai * update TestChannel Signed-off-by: yudong.cai * fix format Signed-off-by: yudong.cai * update TestChannel Signed-off-by: yudong.cai --- internal/dataservice/mock_test.go | 19 ++-- internal/dataservice/server.go | 43 ++++----- internal/dataservice/server_test.go | 132 ++++++++++++++++++++++++++-- 3 files changed, 161 insertions(+), 33 deletions(-) diff --git a/internal/dataservice/mock_test.go b/internal/dataservice/mock_test.go index 26a4917cbb..c0f8efcccc 100644 --- a/internal/dataservice/mock_test.go +++ b/internal/dataservice/mock_test.go @@ -166,11 +166,13 @@ func (m *mockMasterService) HasCollection(ctx context.Context, req *milvuspb.Has func (m *mockMasterService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, + ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Schema: nil, - CollectionID: 0, + Schema: &schemapb.CollectionSchema{ + Name: "test", + }, + CollectionID: 1314, }, nil } @@ -180,7 +182,7 @@ func (m *mockMasterService) ShowCollections(ctx context.Context, req *milvuspb.S ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - CollectionNames: []string{}, + CollectionNames: []string{"test"}, }, nil } @@ -197,7 +199,14 @@ func (m *mockMasterService) HasPartition(ctx context.Context, req *milvuspb.HasP } func (m *mockMasterService) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { - panic("not implemented") // TODO: Implement + return &milvuspb.ShowPartitionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + PartitionNames: []string{"_default"}, + PartitionIDs: []int64{0}, + }, nil } //index builder service diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index d12cffc7c6..7574fb4b5d 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -54,7 +54,7 @@ type Server struct { serverLoopCancel context.CancelFunc serverLoopWg sync.WaitGroup state atomic.Value - client *etcdkv.EtcdKV + kvClient *etcdkv.EtcdKV meta *meta segAllocator segmentAllocatorInterface statsHandler *statsHandler @@ -153,9 +153,8 @@ func (s *Server) initMeta() error { if err != nil { return err } - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - s.client = etcdKV - s.meta, err = newMeta(etcdKV) + s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + s.meta, err = newMeta(s.kvClient) if err != nil { return err } @@ -329,11 +328,12 @@ func (s *Server) startStatsChannel(ctx context.Context) { continue } for _, msg := range msgPack.Msgs { - statistics, ok := msg.(*msgstream.SegmentStatisticsMsg) - if !ok { - log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type())) + if msg.Type() != commonpb.MsgType_SegmentStatistics { + log.Warn("receive unknown msg from segment statistics channel", zap.Stringer("msgType", msg.Type())) + continue } - for _, stat := range statistics.SegStats { + ssMsg := msg.(*msgstream.SegmentStatisticsMsg) + for _, stat := range ssMsg.SegStats { if err := s.statsHandler.HandleSegmentStat(stat); err != nil { log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err)) continue @@ -364,13 +364,14 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { } for _, msg := range msgPack.Msgs { if msg.Type() != commonpb.MsgType_SegmentFlushDone { + log.Warn("receive unknown msg from segment flush channel", zap.Stringer("msgType", msg.Type())) continue } - realMsg := msg.(*msgstream.FlushCompletedMsg) - err := s.meta.FlushSegment(realMsg.SegmentID, realMsg.BeginTimestamp) - log.Debug("dataservice flushed segment", zap.Any("segmentID", realMsg.SegmentID), zap.Error(err)) + fcMsg := msg.(*msgstream.FlushCompletedMsg) + err := s.meta.FlushSegment(fcMsg.SegmentID, fcMsg.BeginTimestamp) + log.Debug("dataservice flushed segment", zap.Any("segmentID", fcMsg.SegmentID), zap.Error(err)) if err != nil { - log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err)) + log.Error("get segment from meta error", zap.Int64("segmentID", fcMsg.SegmentID), zap.Error(err)) continue } } @@ -380,10 +381,10 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() - flushStream, _ := s.msFactory.NewMsgStream(ctx) - flushStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName) - flushStream.Start() - defer flushStream.Close() + timeTickStream, _ := s.msFactory.NewMsgStream(ctx) + timeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName) + timeTickStream.Start() + defer timeTickStream.Close() for { select { case <-ctx.Done(): @@ -391,7 +392,7 @@ func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) { return default: } - msgPack := flushStream.Consume() + msgPack := timeTickStream.Consume() if msgPack == nil { continue } @@ -400,9 +401,9 @@ func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) { log.Warn("receive unknown msg from proxy service timetick", zap.Stringer("msgType", msg.Type())) continue } - tMsg := msg.(*msgstream.TimeTickMsg) + ttMsg := msg.(*msgstream.TimeTickMsg) traceCtx := context.TODO() - if err := s.segAllocator.ExpireAllocations(traceCtx, tMsg.Base.Timestamp); err != nil { + if err := s.segAllocator.ExpireAllocations(traceCtx, ttMsg.Base.Timestamp); err != nil { log.Error("expire allocations error", zap.Error(err)) } } @@ -422,7 +423,7 @@ func (s *Server) Stop() error { // CleanMeta only for test func (s *Server) CleanMeta() error { - return s.client.RemoveWithPrefix("") + return s.kvClient.RemoveWithPrefix("") } func (s *Server) stopServerLoop() { @@ -732,7 +733,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, } p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10)) - _, values, err := s.client.LoadWithPrefix(p) + _, values, err := s.kvClient.LoadWithPrefix(p) if err != nil { resp.Status.Reason = err.Error() return resp, nil diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 4d0e3a5323..3d6aecbeea 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -14,6 +14,7 @@ import ( "context" "math" "testing" + "time" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -80,11 +81,17 @@ func TestGetInsertChannels(t *testing.T) { } func TestAssignSegmentID(t *testing.T) { + const collID = 100 + const collIDInvalid = 101 + const partID = 0 + const channel0 = "channel0" + const channel1 = "channel1" + svr := newTestServer(t) defer closeTestServer(t, svr) schema := newTestSchema() svr.meta.AddCollection(&datapb.CollectionInfo{ - ID: 0, + ID: collID, Schema: schema, Partitions: []int64{}, }) @@ -98,12 +105,12 @@ func TestAssignSegmentID(t *testing.T) { PartitionID UniqueID ChannelName string Count uint32 - IsSuccess bool + Success bool }{ - {"assign segment normally", 0, 0, "channel0", 1000, true}, - {"assign segment with unexisted collection", 1, 0, "channel0", 1000, false}, - {"assign with max count", 0, 0, "channel0", uint32(maxCount), true}, - {"assign with max uint32 count", 0, 0, "channel1", math.MaxUint32, false}, + {"assign segment normally", collID, partID, channel0, 1000, true}, + {"assign segment with invalid collection", collIDInvalid, partID, channel0, 1000, false}, + {"assign with max count", collID, partID, channel0, uint32(maxCount), true}, + {"assign with max uint32 count", collID, partID, channel1, math.MaxUint32, false}, } for _, test := range cases { @@ -123,7 +130,7 @@ func TestAssignSegmentID(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, 1, len(resp.SegIDAssignments)) assign := resp.SegIDAssignments[0] - if test.IsSuccess { + if test.Success { assert.EqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode) assert.EqualValues(t, test.CollectionID, assign.CollectionID) assert.EqualValues(t, test.PartitionID, assign.PartitionID) @@ -340,6 +347,117 @@ func TestGetSegmentStates(t *testing.T) { } } +func TestChannel(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + t.Run("Test StatsChannel", func(t *testing.T) { + const segID = 0 + const rowNum = int64(100) + + segInfo := &datapb.SegmentInfo{ + ID: segID, + } + err := svr.meta.AddSegment(segInfo) + assert.Nil(t, err) + + stats := &internalpb.SegmentStatisticsUpdates{ + SegmentID: segID, + NumRows: rowNum, + } + genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.SegmentStatisticsMsg { + return &msgstream.SegmentStatisticsMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SegmentStatistics: internalpb.SegmentStatistics{ + Base: &commonpb.MsgBase{ + MsgType: msgType, + MsgID: 0, + Timestamp: t, + SourceID: 0, + }, + SegStats: []*internalpb.SegmentStatisticsUpdates{stats}, + }, + } + } + + statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + statsStream.AsProducer([]string{Params.StatisticsChannelName}) + + msgPack := msgstream.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345)) + err = statsStream.Produce(&msgPack) + assert.Nil(t, err) + time.Sleep(time.Second) + + segInfo, err = svr.meta.GetSegment(segID) + assert.Nil(t, err) + assert.Equal(t, rowNum, segInfo.NumRows) + }) + + t.Run("Test SegmentFlushChannel", func(t *testing.T) { + genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.FlushCompletedMsg { + return &msgstream.FlushCompletedMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: msgType, + MsgID: 0, + Timestamp: t, + SourceID: 0, + }, + SegmentID: 0, + }, + } + } + + statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + statsStream.AsProducer([]string{Params.SegmentInfoChannelName}) + + msgPack := msgstream.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 123)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 345)) + err := statsStream.Produce(&msgPack) + assert.Nil(t, err) + time.Sleep(time.Second) + }) + + t.Run("Test ProxyTimeTickChannel", func(t *testing.T) { + genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.TimeTickMsg { + return &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: msgType, + MsgID: 0, + Timestamp: t, + SourceID: 0, + }, + }, + } + } + + timeTickStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + timeTickStream.AsProducer([]string{Params.ProxyTimeTickChannelName}) + + msgPack := msgstream.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 123)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 345)) + err := timeTickStream.Produce(&msgPack) + assert.Nil(t, err) + time.Sleep(time.Second) + }) +} + func newTestServer(t *testing.T) *Server { Params.Init() var err error