diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 7574fb4b5d..0518844345 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -624,20 +624,20 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI }, nil } -func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error { - if !s.meta.HasCollection(collID) { - return fmt.Errorf("can not find collection %d", collID) - } - if !s.meta.HasPartition(collID, partID) { - return fmt.Errorf("can not find partition %d", partID) - } - for _, name := range s.insertChannels { - if name == channelName { - return nil - } - } - return fmt.Errorf("can not find channel %s", channelName) -} +//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error { +// if !s.meta.HasCollection(collID) { +// return fmt.Errorf("can not find collection %d", collID) +// } +// if !s.meta.HasPartition(collID, partID) { +// return fmt.Errorf("can not find partition %d", partID) +// } +// for _, name := range s.insertChannels { +// if name == channelName { +// return nil +// } +// } +// return fmt.Errorf("can not find channel %s", channelName) +//} func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int64) error { resp, err := s.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 3d6aecbeea..4f59fbc70e 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -347,6 +347,47 @@ func TestGetSegmentStates(t *testing.T) { } } +func TestGetInsertBinlogPaths(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + req := &datapb.GetInsertBinlogPathsRequest{ + SegmentID: 0, + } + resp, err := svr.GetInsertBinlogPaths(svr.ctx, req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) +} + +func TestGetCollectionStatistics(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + req := &datapb.GetCollectionStatisticsRequest{ + CollectionID: 0, + } + resp, err := svr.GetCollectionStatistics(svr.ctx, req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) +} + +func TestGetSegmentInfo(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + segInfo := &datapb.SegmentInfo{ + ID: 0, + } + svr.meta.AddSegment(segInfo) + + req := &datapb.GetSegmentInfoRequest{ + SegmentIDs: []int64{0}, + } + resp, err := svr.GetSegmentInfo(svr.ctx, req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) +} + func TestChannel(t *testing.T) { svr := newTestServer(t) defer closeTestServer(t, svr) @@ -358,8 +399,7 @@ func TestChannel(t *testing.T) { segInfo := &datapb.SegmentInfo{ ID: segID, } - err := svr.meta.AddSegment(segInfo) - assert.Nil(t, err) + svr.meta.AddSegment(segInfo) stats := &internalpb.SegmentStatisticsUpdates{ SegmentID: segID, @@ -384,12 +424,14 @@ func TestChannel(t *testing.T) { statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) statsStream.AsProducer([]string{Params.StatisticsChannelName}) + statsStream.Start() + defer statsStream.Close() msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345)) - err = statsStream.Produce(&msgPack) + err := statsStream.Produce(&msgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -416,14 +458,16 @@ func TestChannel(t *testing.T) { } } - statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) - statsStream.AsProducer([]string{Params.SegmentInfoChannelName}) + segInfoStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + segInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) + segInfoStream.Start() + defer segInfoStream.Close() msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 123)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 345)) - err := statsStream.Produce(&msgPack) + err := segInfoStream.Produce(&msgPack) assert.Nil(t, err) time.Sleep(time.Second) }) @@ -447,6 +491,8 @@ func TestChannel(t *testing.T) { timeTickStream, _ := svr.msFactory.NewMsgStream(svr.ctx) timeTickStream.AsProducer([]string{Params.ProxyTimeTickChannelName}) + timeTickStream.Start() + defer timeTickStream.Close() msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 123))