improve data service code coverage (#5020)

* optimize TestAssignSegmentID

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update mockMasterService to improve code coverage

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename client to kvClient in Server

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update server.go

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add TestChannel

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update TestChannel

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update TestChannel

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix format

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update TestChannel

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-04-24 11:29:15 +08:00 committed by GitHub
parent 2b5bedf736
commit 5a75e23795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 161 additions and 33 deletions

View File

@ -166,11 +166,13 @@ func (m *mockMasterService) HasCollection(ctx context.Context, req *milvuspb.Has
func (m *mockMasterService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Schema: nil,
CollectionID: 0,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
CollectionID: 1314,
}, nil
}
@ -180,7 +182,7 @@ func (m *mockMasterService) ShowCollections(ctx context.Context, req *milvuspb.S
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
CollectionNames: []string{},
CollectionNames: []string{"test"},
}, nil
}
@ -197,7 +199,14 @@ func (m *mockMasterService) HasPartition(ctx context.Context, req *milvuspb.HasP
}
func (m *mockMasterService) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
panic("not implemented") // TODO: Implement
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
PartitionNames: []string{"_default"},
PartitionIDs: []int64{0},
}, nil
}
//index builder service

View File

@ -54,7 +54,7 @@ type Server struct {
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
client *etcdkv.EtcdKV
kvClient *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocatorInterface
statsHandler *statsHandler
@ -153,9 +153,8 @@ func (s *Server) initMeta() error {
if err != nil {
return err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
s.client = etcdKV
s.meta, err = newMeta(etcdKV)
s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
s.meta, err = newMeta(s.kvClient)
if err != nil {
return err
}
@ -329,11 +328,12 @@ func (s *Server) startStatsChannel(ctx context.Context) {
continue
}
for _, msg := range msgPack.Msgs {
statistics, ok := msg.(*msgstream.SegmentStatisticsMsg)
if !ok {
log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type()))
if msg.Type() != commonpb.MsgType_SegmentStatistics {
log.Warn("receive unknown msg from segment statistics channel", zap.Stringer("msgType", msg.Type()))
continue
}
for _, stat := range statistics.SegStats {
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err))
continue
@ -364,13 +364,14 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_SegmentFlushDone {
log.Warn("receive unknown msg from segment flush channel", zap.Stringer("msgType", msg.Type()))
continue
}
realMsg := msg.(*msgstream.FlushCompletedMsg)
err := s.meta.FlushSegment(realMsg.SegmentID, realMsg.BeginTimestamp)
log.Debug("dataservice flushed segment", zap.Any("segmentID", realMsg.SegmentID), zap.Error(err))
fcMsg := msg.(*msgstream.FlushCompletedMsg)
err := s.meta.FlushSegment(fcMsg.SegmentID, fcMsg.BeginTimestamp)
log.Debug("dataservice flushed segment", zap.Any("segmentID", fcMsg.SegmentID), zap.Error(err))
if err != nil {
log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err))
log.Error("get segment from meta error", zap.Int64("segmentID", fcMsg.SegmentID), zap.Error(err))
continue
}
}
@ -380,10 +381,10 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
flushStream, _ := s.msFactory.NewMsgStream(ctx)
flushStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName)
flushStream.Start()
defer flushStream.Close()
timeTickStream, _ := s.msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName)
timeTickStream.Start()
defer timeTickStream.Close()
for {
select {
case <-ctx.Done():
@ -391,7 +392,7 @@ func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) {
return
default:
}
msgPack := flushStream.Consume()
msgPack := timeTickStream.Consume()
if msgPack == nil {
continue
}
@ -400,9 +401,9 @@ func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) {
log.Warn("receive unknown msg from proxy service timetick", zap.Stringer("msgType", msg.Type()))
continue
}
tMsg := msg.(*msgstream.TimeTickMsg)
ttMsg := msg.(*msgstream.TimeTickMsg)
traceCtx := context.TODO()
if err := s.segAllocator.ExpireAllocations(traceCtx, tMsg.Base.Timestamp); err != nil {
if err := s.segAllocator.ExpireAllocations(traceCtx, ttMsg.Base.Timestamp); err != nil {
log.Error("expire allocations error", zap.Error(err))
}
}
@ -422,7 +423,7 @@ func (s *Server) Stop() error {
// CleanMeta only for test
func (s *Server) CleanMeta() error {
return s.client.RemoveWithPrefix("")
return s.kvClient.RemoveWithPrefix("")
}
func (s *Server) stopServerLoop() {
@ -732,7 +733,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
},
}
p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10))
_, values, err := s.client.LoadWithPrefix(p)
_, values, err := s.kvClient.LoadWithPrefix(p)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil

View File

@ -14,6 +14,7 @@ import (
"context"
"math"
"testing"
"time"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -80,11 +81,17 @@ func TestGetInsertChannels(t *testing.T) {
}
func TestAssignSegmentID(t *testing.T) {
const collID = 100
const collIDInvalid = 101
const partID = 0
const channel0 = "channel0"
const channel1 = "channel1"
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: 0,
ID: collID,
Schema: schema,
Partitions: []int64{},
})
@ -98,12 +105,12 @@ func TestAssignSegmentID(t *testing.T) {
PartitionID UniqueID
ChannelName string
Count uint32
IsSuccess bool
Success bool
}{
{"assign segment normally", 0, 0, "channel0", 1000, true},
{"assign segment with unexisted collection", 1, 0, "channel0", 1000, false},
{"assign with max count", 0, 0, "channel0", uint32(maxCount), true},
{"assign with max uint32 count", 0, 0, "channel1", math.MaxUint32, false},
{"assign segment normally", collID, partID, channel0, 1000, true},
{"assign segment with invalid collection", collIDInvalid, partID, channel0, 1000, false},
{"assign with max count", collID, partID, channel0, uint32(maxCount), true},
{"assign with max uint32 count", collID, partID, channel1, math.MaxUint32, false},
}
for _, test := range cases {
@ -123,7 +130,7 @@ func TestAssignSegmentID(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
assign := resp.SegIDAssignments[0]
if test.IsSuccess {
if test.Success {
assert.EqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode)
assert.EqualValues(t, test.CollectionID, assign.CollectionID)
assert.EqualValues(t, test.PartitionID, assign.PartitionID)
@ -340,6 +347,117 @@ func TestGetSegmentStates(t *testing.T) {
}
}
func TestChannel(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
t.Run("Test StatsChannel", func(t *testing.T) {
const segID = 0
const rowNum = int64(100)
segInfo := &datapb.SegmentInfo{
ID: segID,
}
err := svr.meta.AddSegment(segInfo)
assert.Nil(t, err)
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
NumRows: rowNum,
}
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.SegmentStatisticsMsg {
return &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentStatistics: internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegStats: []*internalpb.SegmentStatisticsUpdates{stats},
},
}
}
statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
statsStream.AsProducer([]string{Params.StatisticsChannelName})
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345))
err = statsStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
segInfo, err = svr.meta.GetSegment(segID)
assert.Nil(t, err)
assert.Equal(t, rowNum, segInfo.NumRows)
})
t.Run("Test SegmentFlushChannel", func(t *testing.T) {
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.FlushCompletedMsg {
return &msgstream.FlushCompletedMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegmentID: 0,
},
}
}
statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
statsStream.AsProducer([]string{Params.SegmentInfoChannelName})
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 345))
err := statsStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
})
t.Run("Test ProxyTimeTickChannel", func(t *testing.T) {
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.TimeTickMsg {
return &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
},
}
}
timeTickStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
timeTickStream.AsProducer([]string{Params.ProxyTimeTickChannelName})
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 345))
err := timeTickStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
})
}
func newTestServer(t *testing.T) *Server {
Params.Init()
var err error