diff --git a/internal/querynode/index_loader_test.go b/internal/querynode/index_loader_test.go index 89654eb591..c2e47c96ab 100644 --- a/internal/querynode/index_loader_test.go +++ b/internal/querynode/index_loader_test.go @@ -46,7 +46,7 @@ func TestIndexLoader_setIndexInfo(t *testing.T) { assert.NoError(t, err) err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID) - assert.Error(t, err) + assert.NoError(t, err) }) } diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 74dbc24bb2..6dbe1ef3d6 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -795,7 +795,7 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) { if err != nil { return nil, err } - h := newHistorical(ctx, nil, nil, fac, kv) + h := newHistorical(ctx, newMockRootCoord(), newMockIndexCoord(), fac, kv) r, err := genSimpleReplica() if err != nil { return nil, err @@ -809,6 +809,7 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) { return nil, err } h.replica = r + h.loader.historicalReplica = r col, err := h.replica.getCollectionByID(defaultCollectionID) if err != nil { return nil, err diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 03b36afc0b..67a063752f 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -27,7 +27,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { defer cancel() genWatchDMChannelsRequest := func() *querypb.WatchDmChannelsRequest { - schema, _ := genSimpleSchema() + _, schema := genSimpleSchema() req := &querypb.WatchDmChannelsRequest{ Base: genCommonMsgBase(commonpb.MsgType_WatchDmChannels), CollectionID: defaultCollectionID, @@ -98,6 +98,26 @@ func TestTask_watchDmChannelsTask(t *testing.T) { assert.NoError(t, err) }) + t.Run("test execute loadPartition without init collection and partition", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := watchDmChannelsTask{ + req: genWatchDMChannelsRequest(), + node: node, + } + task.req.Infos = []*datapb.VchannelInfo{ + { + CollectionID: defaultCollectionID, + ChannelName: defaultVChannel, + }, + } + task.req.CollectionID++ + task.req.PartitionID++ + err = task.Execute(ctx) + assert.NoError(t, err) + }) + //t.Run("test execute seek error", func(t *testing.T) { // // node, err := genSimpleQueryNode(ctx) @@ -128,7 +148,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - genLoadSegmentsRequest := func() *querypb.LoadSegmentsRequest { + genLoadEmptySegmentsRequest := func() *querypb.LoadSegmentsRequest { _, schema := genSimpleSchema() req := &querypb.LoadSegmentsRequest{ Base: genCommonMsgBase(commonpb.MsgType_LoadSegments), @@ -140,7 +160,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { t.Run("test timestamp", func(t *testing.T) { task := loadSegmentsTask{ - req: genLoadSegmentsRequest(), + req: genLoadEmptySegmentsRequest(), } timestamp := Timestamp(1000) task.req.Base.Timestamp = timestamp @@ -153,7 +173,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { t.Run("test OnEnqueue", func(t *testing.T) { task := loadSegmentsTask{ - req: genLoadSegmentsRequest(), + req: genLoadEmptySegmentsRequest(), } err := task.OnEnqueue() assert.NoError(t, err) @@ -166,8 +186,39 @@ func TestTask_loadSegmentsTask(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + schema, _ := genSimpleSchema() + + fieldBinlog, err := saveSimpleBinLog(ctx) + assert.NoError(t, err) + + req := &querypb.LoadSegmentsRequest{ + Base: genCommonMsgBase(commonpb.MsgType_LoadSegments), + Schema: schema, + LoadCondition: querypb.TriggerCondition_grpcRequest, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: fieldBinlog, + }, + }, + } + task := loadSegmentsTask{ - req: genLoadSegmentsRequest(), + req: req, + node: node, + } + err = task.Execute(ctx) + assert.NoError(t, err) + }) + + t.Run("test execute grpc error", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := loadSegmentsTask{ + req: genLoadEmptySegmentsRequest(), node: node, } task.req.Infos = []*querypb.SegmentLoadInfo{ @@ -186,7 +237,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { assert.NoError(t, err) task := loadSegmentsTask{ - req: genLoadSegmentsRequest(), + req: genLoadEmptySegmentsRequest(), node: node, } task.req.Infos = []*querypb.SegmentLoadInfo{ @@ -206,7 +257,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { assert.NoError(t, err) task := loadSegmentsTask{ - req: genLoadSegmentsRequest(), + req: genLoadEmptySegmentsRequest(), node: node, } task.req.Infos = []*querypb.SegmentLoadInfo{ @@ -220,6 +271,19 @@ func TestTask_loadSegmentsTask(t *testing.T) { err = task.Execute(ctx) assert.Error(t, err) }) + + t.Run("test execute load hand-off", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := loadSegmentsTask{ + req: genLoadEmptySegmentsRequest(), + node: node, + } + task.req.LoadCondition = querypb.TriggerCondition_handoff + err = task.Execute(ctx) + assert.Error(t, err) + }) } func TestTask_releaseCollectionTask(t *testing.T) {