Add datacoord server unit tests (#7499)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-09-06 17:02:41 +08:00 committed by GitHub
parent 5e0d724738
commit fb4e23bc79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 377 additions and 207 deletions

View File

@ -19,7 +19,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -455,43 +454,3 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
}
return nil
}
// only for test
func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Sealed); err != nil {
return err
}
return nil
}
func createNewSegmentHelper(stream msgstream.MsgStream) allocHelper {
h := allocHelper{}
h.afterCreateSegment = func(segment *datapb.SegmentInfo) error {
infoMsg := &msgstream.SegmentInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Segment: segment,
},
}
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if err := stream.Produce(msgPack); err != nil {
return err
}
return nil
}
return h
}

View File

@ -108,6 +108,7 @@ type Server struct {
rootCoordClientCreator rootCoordCreatorFunc
}
// ServerHelper datacoord server injection helper
type ServerHelper struct {
eventAfterHandleDataNodeTt func()
}
@ -118,20 +119,30 @@ func defaultServerHelper() ServerHelper {
}
}
// Option utility function signature to set DataCoord server attributes
type Option func(svr *Server)
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
return func(svr *Server) {
svr.rootCoordClientCreator = creator
}
}
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
func SetServerHelper(helper ServerHelper) Option {
return func(svr *Server) {
svr.helper = helper
}
}
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
return func(svr *Server) {
svr.cluster = cluster
}
}
// CreateServer create `Server` instance
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
rand.Seed(time.Now().UnixNano())
@ -223,7 +234,11 @@ func (s *Server) Start() error {
func (s *Server) initCluster() error {
var err error
s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
// cluster could be set by options
// by-pass default NewCluster process if already set
if s.cluster == nil {
s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
}
return err
}
@ -252,27 +267,6 @@ func (s *Server) initServiceDiscovery() error {
return nil
}
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
if s.session == nil {
log.Warn("load data nodes but session is nil")
return []*datapb.DataNodeInfo{}
}
sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
if err != nil {
log.Warn("load data nodes faild", zap.Error(err))
return []*datapb.DataNodeInfo{}
}
datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
for _, session := range sessions {
datanodes = append(datanodes, &datapb.DataNodeInfo{
Address: session.Address,
Version: session.ServerID,
Channels: []*datapb.ChannelStatus{},
})
}
return datanodes
}
func (s *Server) startSegmentManager() {
s.segmentManager = newSegmentManager(s.meta, s.allocator)
}
@ -425,33 +419,41 @@ func (s *Server) startWatchService(ctx context.Context) {
log.Debug("watch service shutdown")
return
case event := <-s.eventCh:
info := &datapb.DataNodeInfo{
Address: event.Session.Address,
Version: event.Session.ServerID,
Channels: []*datapb.ChannelStatus{},
}
node := NewNodeInfo(ctx, info)
switch event.EventType {
case sessionutil.SessionAddEvent:
log.Info("received datanode register",
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.Register(node)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:
log.Info("received datanode unregister",
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.UnRegister(node)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
default:
log.Warn("receive unknown service event type",
zap.Any("type", event.EventType))
}
s.handleSessionEvent(ctx, event)
}
}
}
// handles session events - DataNodes Add/Del
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) {
if event == nil {
return
}
info := &datapb.DataNodeInfo{
Address: event.Session.Address,
Version: event.Session.ServerID,
Channels: []*datapb.ChannelStatus{},
}
node := NewNodeInfo(ctx, info)
switch event.EventType {
case sessionutil.SessionAddEvent:
log.Info("received datanode register",
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.Register(node)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:
log.Info("received datanode unregister",
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.UnRegister(node)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
default:
log.Warn("receive unknown service event type",
zap.Any("type", event.EventType))
}
}
func (s *Server) startActiveCheck(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
@ -485,32 +487,44 @@ func (s *Server) startFlushLoop(ctx context.Context) {
log.Debug("flush loop shutdown")
return
case segmentID := <-s.flushCh:
segment := s.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
continue
}
req := &datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
},
Segment: segment.SegmentInfo,
}
resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
if err = VerifyResponse(resp, err); err != nil {
log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
continue
}
// set segment to SegmentState_Flushed
if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
continue
}
log.Debug("flush segment complete", zap.Int64("id", segmentID))
//Ignore return error
_ = s.postFlush(ctx, segmentID)
}
}
}
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
segment := s.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
return errors.New("segment not found")
}
// Notify RootCoord segment is flushed
req := &datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
},
Segment: segment.SegmentInfo,
}
resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
if err = VerifyResponse(resp, err); err != nil {
log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
return err
}
// set segment to SegmentState_Flushed
if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
return err
}
log.Debug("flush segment complete", zap.Int64("id", segmentID))
return nil
}
// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
func (s *Server) handleFlushingSegments(ctx context.Context) {
segments := s.meta.GetFlushingSegments()
for _, segment := range segments {

View File

@ -11,6 +11,7 @@ package datacoord
import (
"context"
"errors"
"math/rand"
"os"
"path"
@ -19,6 +20,7 @@ import (
"testing"
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/log"
@ -61,16 +63,15 @@ func TestAssignSegmentID(t *testing.T) {
const channel0 = "channel0"
const channel1 = "channel1"
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
Partitions: []int64{},
})
t.Run("assign segment normally", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
Partitions: []int64{},
})
req := &datapb.SegmentIDRequest{
Count: 1000,
ChannelName: channel0,
@ -113,6 +114,14 @@ func TestAssignSegmentID(t *testing.T) {
})
t.Run("assign segment with invalid collection", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
Partitions: []int64{},
})
req := &datapb.SegmentIDRequest{
Count: 1000,
ChannelName: channel0,
@ -221,55 +230,57 @@ func TestGetStatisticsChannel(t *testing.T) {
}
func TestGetSegmentStates(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segment := &datapb.SegmentInfo{
ID: 1000,
CollectionID: 100,
PartitionID: 0,
InsertChannel: "c1",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 0,
},
}
err := svr.meta.AddSegment(NewSegmentInfo(segment))
assert.Nil(t, err)
t.Run("normal cases", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segment := &datapb.SegmentInfo{
ID: 1000,
CollectionID: 100,
PartitionID: 0,
InsertChannel: "c1",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 0,
},
}
err := svr.meta.AddSegment(NewSegmentInfo(segment))
assert.Nil(t, err)
cases := []struct {
description string
id UniqueID
expected bool
expectedState commonpb.SegmentState
}{
{"get existed segment", 1000, true, commonpb.SegmentState_Growing},
{"get non-existed segment", 10, false, commonpb.SegmentState_Growing},
}
cases := []struct {
description string
id UniqueID
expected bool
expectedState commonpb.SegmentState
}{
{"get existed segment", 1000, true, commonpb.SegmentState_Growing},
{"get non-existed segment", 10, false, commonpb.SegmentState_Growing},
}
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentIDs: []int64{test.id},
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentIDs: []int64{test.id},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.States))
if test.expected {
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.States[0].Status.ErrorCode)
assert.EqualValues(t, test.expectedState, resp.States[0].State)
}
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.States))
if test.expected {
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.States[0].Status.ErrorCode)
assert.EqualValues(t, test.expectedState, resp.States[0].State)
}
})
}
}
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
@ -704,45 +715,46 @@ func TestChannel(t *testing.T) {
}
func TestSaveBinlogPaths(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
collections := []struct {
ID UniqueID
Partitions []int64
}{
{0, []int64{0, 1}},
{1, []int64{0, 1}},
}
for _, collection := range collections {
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collection.ID,
Schema: nil,
Partitions: collection.Partitions,
})
}
segments := []struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
}{
{0, 0, 0},
{1, 0, 0},
{2, 0, 1},
{3, 1, 1},
}
for _, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
t.Run("Normal SaveRequest", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
collections := []struct {
ID UniqueID
Partitions []int64
}{
{0, []int64{0, 1}},
{1, []int64{0, 1}},
}
for _, collection := range collections {
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collection.ID,
Schema: nil,
Partitions: collection.Partitions,
})
}
segments := []struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
}{
{0, 0, 0},
{1, 0, 0},
{2, 0, 1},
{3, 1, 1},
}
for _, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
@ -1118,14 +1130,15 @@ func TestGetVChannelPos(t *testing.T) {
}
func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
t.Run("test get recovery info with no segments", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
req := &datapb.GetRecoveryInfoRequest{
CollectionID: 0,
PartitionID: 0,
@ -1162,6 +1175,13 @@ func TestGetRecoveryInfo(t *testing.T) {
}
t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed)
err := svr.meta.AddSegment(NewSegmentInfo(seg1))
@ -1183,6 +1203,13 @@ func TestGetRecoveryInfo(t *testing.T) {
})
t.Run("test get recovery of unflushed segments ", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing)
seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing)
err := svr.meta.AddSegment(NewSegmentInfo(seg1))
@ -1203,6 +1230,13 @@ func TestGetRecoveryInfo(t *testing.T) {
})
t.Run("test get binlogs", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
binlogReq := &datapb.SaveBinlogPathsRequest{
SegmentID: 0,
CollectionID: 0,
@ -1247,6 +1281,169 @@ func TestGetRecoveryInfo(t *testing.T) {
})
}
func TestOptions(t *testing.T) {
t.Run("SetRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, endpoints []string) (types.RootCoord, error) {
return nil, errors.New("dummy")
}
opt := SetRootCoordCreator(crt)
assert.NotNil(t, opt)
svr.rootCoordClientCreator = nil
opt(svr)
// testify cannot compare function directly
// the behavior is actually undefined
assert.NotNil(t, crt)
assert.NotNil(t, svr.rootCoordClientCreator)
})
t.Run("SetCluster", func(t *testing.T) {
registerPolicy := newEmptyRegisterPolicy()
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withRegisterPolicy(registerPolicy))
assert.Nil(t, err)
opt := SetCluster(cluster)
assert.NotNil(t, opt)
svr := newTestServer(t, nil, opt)
defer closeTestServer(t, svr)
assert.Equal(t, cluster, svr.cluster)
})
}
func TestHandleSessionEvent(t *testing.T) {
registerPolicy := newEmptyRegisterPolicy()
unregisterPolicy := newEmptyUnregisterPolicy()
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore,
dummyPosProvider{},
withRegisterPolicy(registerPolicy),
withUnregistorPolicy(unregisterPolicy))
assert.Nil(t, err)
defer cluster.Close()
cluster.Startup(nil)
svr := newTestServer(t, nil, SetCluster(cluster))
defer closeTestServer(t, svr)
t.Run("handle events", func(t *testing.T) {
// None event
evt := &sessionutil.SessionEvent{
EventType: sessionutil.SessionNoneEvent,
Session: &sessionutil.Session{
ServerID: 0,
ServerName: "",
Address: "",
Exclusive: false,
},
}
svr.handleSessionEvent(context.Background(), evt)
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionAddEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
}
svr.handleSessionEvent(context.Background(), evt)
<-ch
dataNodes := svr.cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "DN127.0.0.101", dataNodes[0].Info.GetAddress())
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionDelEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
}
svr.handleSessionEvent(context.Background(), evt)
<-ch
dataNodes = svr.cluster.GetNodes()
assert.EqualValues(t, 0, len(dataNodes))
})
t.Run("nil evt", func(t *testing.T) {
assert.NotPanics(t, func() {
svr.handleSessionEvent(context.Background(), nil)
})
})
}
type rootCoordSegFlushComplete struct {
mockRootCoordService
flag bool
}
//SegmentFlushCompleted, override default behavior
func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
if rc.flag {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
func TestPostFlush(t *testing.T) {
t.Run("segment not found", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
err := svr.postFlush(context.Background(), 1)
assert.EqualValues(t, errors.New("segment not found"), err)
})
t.Run("failed to sync with Rootcoord", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClient = &rootCoordSegFlushComplete{flag: false}
err := svr.meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
State: commonpb.SegmentState_Flushing,
}))
assert.Nil(t, err)
err = svr.postFlush(context.Background(), 1)
assert.NotNil(t, err)
})
t.Run("success post flush", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClient = &rootCoordSegFlushComplete{flag: true}
err := svr.meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
State: commonpb.SegmentState_Flushing,
}))
assert.Nil(t, err)
err = svr.postFlush(context.Background(), 1)
assert.Nil(t, err)
})
}
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.TimeTickChannelName = strconv.Itoa(rand.Int())