From fb4e23bc79a2f1d087ab194b230840caedcd5186 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 6 Sep 2021 17:02:41 +0800 Subject: [PATCH] Add datacoord server unit tests (#7499) Signed-off-by: Congqi Xia --- internal/datacoord/segment_manager.go | 41 --- internal/datacoord/server.go | 148 +++++----- internal/datacoord/server_test.go | 395 +++++++++++++++++++------- 3 files changed, 377 insertions(+), 207 deletions(-) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 25415a7576..b68e441d2d 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -19,7 +19,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -455,43 +454,3 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { } return nil } - -// only for test -func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) error { - sp, _ := trace.StartSpanFromContext(ctx) - defer sp.Finish() - s.mu.Lock() - defer s.mu.Unlock() - if err := s.meta.SetState(segmentID, commonpb.SegmentState_Sealed); err != nil { - return err - } - return nil -} - -func createNewSegmentHelper(stream msgstream.MsgStream) allocHelper { - h := allocHelper{} - h.afterCreateSegment = func(segment *datapb.SegmentInfo) error { - infoMsg := &msgstream.SegmentInfoMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - SegmentMsg: datapb.SegmentMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: Params.NodeID, - }, - Segment: segment, - }, - } - msgPack := &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{infoMsg}, - } - if err := stream.Produce(msgPack); err != nil { - return err - } - return nil - } - return h -} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 608e2c215b..72b67427cf 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -108,6 +108,7 @@ type Server struct { rootCoordClientCreator rootCoordCreatorFunc } +// ServerHelper datacoord server injection helper type ServerHelper struct { eventAfterHandleDataNodeTt func() } @@ -118,20 +119,30 @@ func defaultServerHelper() ServerHelper { } } +// Option utility function signature to set DataCoord server attributes type Option func(svr *Server) +// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter func SetRootCoordCreator(creator rootCoordCreatorFunc) Option { return func(svr *Server) { svr.rootCoordClientCreator = creator } } +// SetServerHelper returns an `Option` setting ServerHelp with provided parameter func SetServerHelper(helper ServerHelper) Option { return func(svr *Server) { svr.helper = helper } } +// SetCluster returns an `Option` setting Cluster with provided parameter +func SetCluster(cluster *Cluster) Option { + return func(svr *Server) { + svr.cluster = cluster + } +} + // CreateServer create `Server` instance func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) { rand.Seed(time.Now().UnixNano()) @@ -223,7 +234,11 @@ func (s *Server) Start() error { func (s *Server) initCluster() error { var err error - s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s) + // cluster could be set by options + // by-pass default NewCluster process if already set + if s.cluster == nil { + s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s) + } return err } @@ -252,27 +267,6 @@ func (s *Server) initServiceDiscovery() error { return nil } -func (s *Server) loadDataNodes() []*datapb.DataNodeInfo { - if s.session == nil { - log.Warn("load data nodes but session is nil") - return []*datapb.DataNodeInfo{} - } - sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole) - if err != nil { - log.Warn("load data nodes faild", zap.Error(err)) - return []*datapb.DataNodeInfo{} - } - datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions)) - for _, session := range sessions { - datanodes = append(datanodes, &datapb.DataNodeInfo{ - Address: session.Address, - Version: session.ServerID, - Channels: []*datapb.ChannelStatus{}, - }) - } - return datanodes -} - func (s *Server) startSegmentManager() { s.segmentManager = newSegmentManager(s.meta, s.allocator) } @@ -425,33 +419,41 @@ func (s *Server) startWatchService(ctx context.Context) { log.Debug("watch service shutdown") return case event := <-s.eventCh: - info := &datapb.DataNodeInfo{ - Address: event.Session.Address, - Version: event.Session.ServerID, - Channels: []*datapb.ChannelStatus{}, - } - node := NewNodeInfo(ctx, info) - switch event.EventType { - case sessionutil.SessionAddEvent: - log.Info("received datanode register", - zap.String("address", info.Address), - zap.Int64("serverID", info.Version)) - s.cluster.Register(node) - s.metricsCacheManager.InvalidateSystemInfoMetrics() - case sessionutil.SessionDelEvent: - log.Info("received datanode unregister", - zap.String("address", info.Address), - zap.Int64("serverID", info.Version)) - s.cluster.UnRegister(node) - s.metricsCacheManager.InvalidateSystemInfoMetrics() - default: - log.Warn("receive unknown service event type", - zap.Any("type", event.EventType)) - } + s.handleSessionEvent(ctx, event) } } } +// handles session events - DataNodes Add/Del +func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) { + if event == nil { + return + } + info := &datapb.DataNodeInfo{ + Address: event.Session.Address, + Version: event.Session.ServerID, + Channels: []*datapb.ChannelStatus{}, + } + node := NewNodeInfo(ctx, info) + switch event.EventType { + case sessionutil.SessionAddEvent: + log.Info("received datanode register", + zap.String("address", info.Address), + zap.Int64("serverID", info.Version)) + s.cluster.Register(node) + s.metricsCacheManager.InvalidateSystemInfoMetrics() + case sessionutil.SessionDelEvent: + log.Info("received datanode unregister", + zap.String("address", info.Address), + zap.Int64("serverID", info.Version)) + s.cluster.UnRegister(node) + s.metricsCacheManager.InvalidateSystemInfoMetrics() + default: + log.Warn("receive unknown service event type", + zap.Any("type", event.EventType)) + } +} + func (s *Server) startActiveCheck(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() @@ -485,32 +487,44 @@ func (s *Server) startFlushLoop(ctx context.Context) { log.Debug("flush loop shutdown") return case segmentID := <-s.flushCh: - segment := s.meta.GetSegment(segmentID) - if segment == nil { - log.Warn("failed to get flused segment", zap.Int64("id", segmentID)) - continue - } - req := &datapb.SegmentFlushCompletedMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentFlushDone, - }, - Segment: segment.SegmentInfo, - } - resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req) - if err = VerifyResponse(resp, err); err != nil { - log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err)) - continue - } - // set segment to SegmentState_Flushed - if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { - log.Error("flush segment complete failed", zap.Error(err)) - continue - } - log.Debug("flush segment complete", zap.Int64("id", segmentID)) + //Ignore return error + _ = s.postFlush(ctx, segmentID) } } } +// post function after flush is done +// 1. check segment id is valid +// 2. notify RootCoord segment is flushed +// 3. change segment state to `Flushed` in meta +func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { + segment := s.meta.GetSegment(segmentID) + if segment == nil { + log.Warn("failed to get flused segment", zap.Int64("id", segmentID)) + return errors.New("segment not found") + } + // Notify RootCoord segment is flushed + req := &datapb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentFlushDone, + }, + Segment: segment.SegmentInfo, + } + resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req) + if err = VerifyResponse(resp, err); err != nil { + log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err)) + return err + } + // set segment to SegmentState_Flushed + if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { + log.Error("flush segment complete failed", zap.Error(err)) + return err + } + log.Debug("flush segment complete", zap.Int64("id", segmentID)) + return nil +} + +// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic func (s *Server) handleFlushingSegments(ctx context.Context) { segments := s.meta.GetFlushingSegments() for _, segment := range segments { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index c67f4744f9..fa5b4bba98 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -11,6 +11,7 @@ package datacoord import ( "context" + "errors" "math/rand" "os" "path" @@ -19,6 +20,7 @@ import ( "testing" "time" + memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/log" @@ -61,16 +63,15 @@ func TestAssignSegmentID(t *testing.T) { const channel0 = "channel0" const channel1 = "channel1" - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&datapb.CollectionInfo{ - ID: collID, - Schema: schema, - Partitions: []int64{}, - }) - t.Run("assign segment normally", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: collID, + Schema: schema, + Partitions: []int64{}, + }) req := &datapb.SegmentIDRequest{ Count: 1000, ChannelName: channel0, @@ -113,6 +114,14 @@ func TestAssignSegmentID(t *testing.T) { }) t.Run("assign segment with invalid collection", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: collID, + Schema: schema, + Partitions: []int64{}, + }) req := &datapb.SegmentIDRequest{ Count: 1000, ChannelName: channel0, @@ -221,55 +230,57 @@ func TestGetStatisticsChannel(t *testing.T) { } func TestGetSegmentStates(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - segment := &datapb.SegmentInfo{ - ID: 1000, - CollectionID: 100, - PartitionID: 0, - InsertChannel: "c1", - NumOfRows: 0, - State: commonpb.SegmentState_Growing, - StartPosition: &internalpb.MsgPosition{ - ChannelName: "c1", - MsgID: []byte{}, - MsgGroup: "", - Timestamp: 0, - }, - } - err := svr.meta.AddSegment(NewSegmentInfo(segment)) - assert.Nil(t, err) + t.Run("normal cases", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + segment := &datapb.SegmentInfo{ + ID: 1000, + CollectionID: 100, + PartitionID: 0, + InsertChannel: "c1", + NumOfRows: 0, + State: commonpb.SegmentState_Growing, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "c1", + MsgID: []byte{}, + MsgGroup: "", + Timestamp: 0, + }, + } + err := svr.meta.AddSegment(NewSegmentInfo(segment)) + assert.Nil(t, err) - cases := []struct { - description string - id UniqueID - expected bool - expectedState commonpb.SegmentState - }{ - {"get existed segment", 1000, true, commonpb.SegmentState_Growing}, - {"get non-existed segment", 10, false, commonpb.SegmentState_Growing}, - } + cases := []struct { + description string + id UniqueID + expected bool + expectedState commonpb.SegmentState + }{ + {"get existed segment", 1000, true, commonpb.SegmentState_Growing}, + {"get non-existed segment", 10, false, commonpb.SegmentState_Growing}, + } - for _, test := range cases { - t.Run(test.description, func(t *testing.T) { - resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - SegmentIDs: []int64{test.id}, + for _, test := range cases { + t.Run(test.description, func(t *testing.T) { + resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + SegmentIDs: []int64{test.id}, + }) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 1, len(resp.States)) + if test.expected { + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.States[0].Status.ErrorCode) + assert.EqualValues(t, test.expectedState, resp.States[0].State) + } }) - assert.Nil(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, 1, len(resp.States)) - if test.expected { - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.States[0].Status.ErrorCode) - assert.EqualValues(t, test.expectedState, resp.States[0].State) - } - }) - } + } + }) t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) @@ -704,45 +715,46 @@ func TestChannel(t *testing.T) { } func TestSaveBinlogPaths(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - collections := []struct { - ID UniqueID - Partitions []int64 - }{ - {0, []int64{0, 1}}, - {1, []int64{0, 1}}, - } - - for _, collection := range collections { - svr.meta.AddCollection(&datapb.CollectionInfo{ - ID: collection.ID, - Schema: nil, - Partitions: collection.Partitions, - }) - } - - segments := []struct { - id UniqueID - collectionID UniqueID - partitionID UniqueID - }{ - {0, 0, 0}, - {1, 0, 0}, - {2, 0, 1}, - {3, 1, 1}, - } - for _, segment := range segments { - s := &datapb.SegmentInfo{ - ID: segment.id, - CollectionID: segment.collectionID, - PartitionID: segment.partitionID, - } - err := svr.meta.AddSegment(NewSegmentInfo(s)) - assert.Nil(t, err) - } t.Run("Normal SaveRequest", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + collections := []struct { + ID UniqueID + Partitions []int64 + }{ + {0, []int64{0, 1}}, + {1, []int64{0, 1}}, + } + + for _, collection := range collections { + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: collection.ID, + Schema: nil, + Partitions: collection.Partitions, + }) + } + + segments := []struct { + id UniqueID + collectionID UniqueID + partitionID UniqueID + }{ + {0, 0, 0}, + {1, 0, 0}, + {2, 0, 1}, + {3, 1, 1}, + } + for _, segment := range segments { + s := &datapb.SegmentInfo{ + ID: segment.id, + CollectionID: segment.collectionID, + PartitionID: segment.partitionID, + } + err := svr.meta.AddSegment(NewSegmentInfo(s)) + assert.Nil(t, err) + } + ctx := context.Background() resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ Base: &commonpb.MsgBase{ @@ -1118,14 +1130,15 @@ func TestGetVChannelPos(t *testing.T) { } func TestGetRecoveryInfo(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { - return newMockRootCoordService(), nil - } t.Run("test get recovery info with no segments", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, PartitionID: 0, @@ -1162,6 +1175,13 @@ func TestGetRecoveryInfo(t *testing.T) { } t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed) err := svr.meta.AddSegment(NewSegmentInfo(seg1)) @@ -1183,6 +1203,13 @@ func TestGetRecoveryInfo(t *testing.T) { }) t.Run("test get recovery of unflushed segments ", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing) err := svr.meta.AddSegment(NewSegmentInfo(seg1)) @@ -1203,6 +1230,13 @@ func TestGetRecoveryInfo(t *testing.T) { }) t.Run("test get binlogs", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + binlogReq := &datapb.SaveBinlogPathsRequest{ SegmentID: 0, CollectionID: 0, @@ -1247,6 +1281,169 @@ func TestGetRecoveryInfo(t *testing.T) { }) } +func TestOptions(t *testing.T) { + t.Run("SetRootCoordCreator", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, endpoints []string) (types.RootCoord, error) { + return nil, errors.New("dummy") + } + opt := SetRootCoordCreator(crt) + assert.NotNil(t, opt) + svr.rootCoordClientCreator = nil + opt(svr) + // testify cannot compare function directly + // the behavior is actually undefined + assert.NotNil(t, crt) + assert.NotNil(t, svr.rootCoordClientCreator) + }) + t.Run("SetCluster", func(t *testing.T) { + + registerPolicy := newEmptyRegisterPolicy() + ch := make(chan interface{}) + kv := memkv.NewMemoryKV() + spyClusterStore := &SpyClusterStore{ + NodesInfo: NewNodesInfo(), + ch: ch, + } + cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withRegisterPolicy(registerPolicy)) + assert.Nil(t, err) + opt := SetCluster(cluster) + assert.NotNil(t, opt) + svr := newTestServer(t, nil, opt) + defer closeTestServer(t, svr) + + assert.Equal(t, cluster, svr.cluster) + }) +} + +func TestHandleSessionEvent(t *testing.T) { + registerPolicy := newEmptyRegisterPolicy() + unregisterPolicy := newEmptyUnregisterPolicy() + ch := make(chan interface{}) + kv := memkv.NewMemoryKV() + spyClusterStore := &SpyClusterStore{ + NodesInfo: NewNodesInfo(), + ch: ch, + } + cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, + dummyPosProvider{}, + withRegisterPolicy(registerPolicy), + withUnregistorPolicy(unregisterPolicy)) + assert.Nil(t, err) + defer cluster.Close() + + cluster.Startup(nil) + + svr := newTestServer(t, nil, SetCluster(cluster)) + defer closeTestServer(t, svr) + t.Run("handle events", func(t *testing.T) { + // None event + evt := &sessionutil.SessionEvent{ + EventType: sessionutil.SessionNoneEvent, + Session: &sessionutil.Session{ + ServerID: 0, + ServerName: "", + Address: "", + Exclusive: false, + }, + } + svr.handleSessionEvent(context.Background(), evt) + + evt = &sessionutil.SessionEvent{ + EventType: sessionutil.SessionAddEvent, + Session: &sessionutil.Session{ + ServerID: 101, + ServerName: "DN101", + Address: "DN127.0.0.101", + Exclusive: false, + }, + } + svr.handleSessionEvent(context.Background(), evt) + <-ch + dataNodes := svr.cluster.GetNodes() + assert.EqualValues(t, 1, len(dataNodes)) + assert.EqualValues(t, "DN127.0.0.101", dataNodes[0].Info.GetAddress()) + + evt = &sessionutil.SessionEvent{ + EventType: sessionutil.SessionDelEvent, + Session: &sessionutil.Session{ + ServerID: 101, + ServerName: "DN101", + Address: "DN127.0.0.101", + Exclusive: false, + }, + } + svr.handleSessionEvent(context.Background(), evt) + <-ch + dataNodes = svr.cluster.GetNodes() + assert.EqualValues(t, 0, len(dataNodes)) + }) + + t.Run("nil evt", func(t *testing.T) { + assert.NotPanics(t, func() { + svr.handleSessionEvent(context.Background(), nil) + }) + }) +} + +type rootCoordSegFlushComplete struct { + mockRootCoordService + flag bool +} + +//SegmentFlushCompleted, override default behavior +func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { + if rc.flag { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil + } + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil +} + +func TestPostFlush(t *testing.T) { + t.Run("segment not found", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + err := svr.postFlush(context.Background(), 1) + assert.EqualValues(t, errors.New("segment not found"), err) + }) + t.Run("failed to sync with Rootcoord", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + svr.rootCoordClient = &rootCoordSegFlushComplete{flag: false} + + err := svr.meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + State: commonpb.SegmentState_Flushing, + })) + + assert.Nil(t, err) + + err = svr.postFlush(context.Background(), 1) + assert.NotNil(t, err) + }) + t.Run("success post flush", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + svr.rootCoordClient = &rootCoordSegFlushComplete{flag: true} + + err := svr.meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + State: commonpb.SegmentState_Flushing, + })) + + assert.Nil(t, err) + + err = svr.postFlush(context.Background(), 1) + assert.Nil(t, err) + }) +} + func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { Params.Init() Params.TimeTickChannelName = strconv.Itoa(rand.Int())