mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add task unittest for query node (#7820)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
bcf9b4e240
commit
27beb033de
@ -46,7 +46,7 @@ func TestIndexLoader_setIndexInfo(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -795,7 +795,7 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h := newHistorical(ctx, nil, nil, fac, kv)
|
||||
h := newHistorical(ctx, newMockRootCoord(), newMockIndexCoord(), fac, kv)
|
||||
r, err := genSimpleReplica()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -809,6 +809,7 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) {
|
||||
return nil, err
|
||||
}
|
||||
h.replica = r
|
||||
h.loader.historicalReplica = r
|
||||
col, err := h.replica.getCollectionByID(defaultCollectionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -27,7 +27,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
genWatchDMChannelsRequest := func() *querypb.WatchDmChannelsRequest {
|
||||
schema, _ := genSimpleSchema()
|
||||
_, schema := genSimpleSchema()
|
||||
req := &querypb.WatchDmChannelsRequest{
|
||||
Base: genCommonMsgBase(commonpb.MsgType_WatchDmChannels),
|
||||
CollectionID: defaultCollectionID,
|
||||
@ -98,6 +98,26 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute loadPartition without init collection and partition", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := watchDmChannelsTask{
|
||||
req: genWatchDMChannelsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.Infos = []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: defaultCollectionID,
|
||||
ChannelName: defaultVChannel,
|
||||
},
|
||||
}
|
||||
task.req.CollectionID++
|
||||
task.req.PartitionID++
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
//t.Run("test execute seek error", func(t *testing.T) {
|
||||
//
|
||||
// node, err := genSimpleQueryNode(ctx)
|
||||
@ -128,7 +148,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
genLoadSegmentsRequest := func() *querypb.LoadSegmentsRequest {
|
||||
genLoadEmptySegmentsRequest := func() *querypb.LoadSegmentsRequest {
|
||||
_, schema := genSimpleSchema()
|
||||
req := &querypb.LoadSegmentsRequest{
|
||||
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
|
||||
@ -140,7 +160,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
|
||||
t.Run("test timestamp", func(t *testing.T) {
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadSegmentsRequest(),
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
}
|
||||
timestamp := Timestamp(1000)
|
||||
task.req.Base.Timestamp = timestamp
|
||||
@ -153,7 +173,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
|
||||
t.Run("test OnEnqueue", func(t *testing.T) {
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadSegmentsRequest(),
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
}
|
||||
err := task.OnEnqueue()
|
||||
assert.NoError(t, err)
|
||||
@ -166,8 +186,39 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
schema, _ := genSimpleSchema()
|
||||
|
||||
fieldBinlog, err := saveSimpleBinLog(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
req := &querypb.LoadSegmentsRequest{
|
||||
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
|
||||
Schema: schema,
|
||||
LoadCondition: querypb.TriggerCondition_grpcRequest,
|
||||
Infos: []*querypb.SegmentLoadInfo{
|
||||
{
|
||||
SegmentID: defaultSegmentID,
|
||||
PartitionID: defaultPartitionID,
|
||||
CollectionID: defaultCollectionID,
|
||||
BinlogPaths: fieldBinlog,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadSegmentsRequest(),
|
||||
req: req,
|
||||
node: node,
|
||||
}
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute grpc error", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.Infos = []*querypb.SegmentLoadInfo{
|
||||
@ -186,7 +237,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadSegmentsRequest(),
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.Infos = []*querypb.SegmentLoadInfo{
|
||||
@ -206,7 +257,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadSegmentsRequest(),
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.Infos = []*querypb.SegmentLoadInfo{
|
||||
@ -220,6 +271,19 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
||||
err = task.Execute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute load hand-off", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := loadSegmentsTask{
|
||||
req: genLoadEmptySegmentsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.LoadCondition = querypb.TriggerCondition_handoff
|
||||
err = task.Execute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTask_releaseCollectionTask(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user