Add impl and plan unittests for query node (#7712)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-09-11 17:22:01 +08:00 committed by GitHub
parent 047850c869
commit 3c01839d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 14 deletions

View File

@ -380,7 +380,6 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
return status, nil
}
// ReleaseSegments deprecated
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {

View File

@ -13,6 +13,7 @@ package querynode
import (
"context"
"encoding/json"
"math/rand"
"testing"
@ -20,6 +21,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
)
@ -244,20 +247,104 @@ func TestImpl_isHealthy(t *testing.T) {
func TestImpl_GetMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := &milvuspb.GetMetricsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
}
t.Run("test GetMetrics", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
_, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
node.UpdateStateCode(internalpb.StateCode_Abnormal)
_, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
metricReq := make(map[string]string)
metricReq[metricsinfo.MetricTypeKey] = "system_info"
mReq, err := json.Marshal(metricReq)
assert.NoError(t, err)
req := &milvuspb.GetMetricsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
Request: string(mReq),
}
_, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
})
t.Run("test ParseMetricType failed", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := &milvuspb.GetMetricsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
}
_, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
node.UpdateStateCode(internalpb.StateCode_Abnormal)
_, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
})
}
func TestImpl_ReleaseSegments(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test valid", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := &queryPb.ReleaseSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_ReleaseSegments),
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
SegmentIDs: []UniqueID{defaultSegmentID},
}
_, err = node.ReleaseSegments(ctx, req)
assert.NoError(t, err)
})
t.Run("test invalid query node", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := &queryPb.ReleaseSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_ReleaseSegments),
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
SegmentIDs: []UniqueID{defaultSegmentID},
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
_, err = node.ReleaseSegments(ctx, req)
assert.Error(t, err)
})
t.Run("test segment not exists", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := &queryPb.ReleaseSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_ReleaseSegments),
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
SegmentIDs: []UniqueID{defaultSegmentID},
}
err = node.historical.replica.removeSegment(defaultSegmentID)
assert.NoError(t, err)
err = node.streaming.replica.removeSegment(defaultSegmentID)
assert.NoError(t, err)
status, err := node.ReleaseSegments(ctx, req)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, status.ErrorCode)
})
}

View File

@ -12,6 +12,7 @@
package querynode
import (
"context"
"encoding/binary"
"math"
"testing"
@ -20,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/planpb"
)
func TestPlan_Plan(t *testing.T) {
@ -41,6 +43,26 @@ func TestPlan_Plan(t *testing.T) {
deleteCollection(collection)
}
func TestPlan_createSearchPlanByExpr(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
col, err := historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
planNode := &planpb.PlanNode{
OutputFieldIds: []FieldID{rowIDFieldID},
}
expr, err := proto.Marshal(planNode)
assert.NoError(t, err)
_, err = createSearchPlanByExpr(col, expr)
assert.Error(t, err)
}
func TestPlan_NilCollection(t *testing.T) {
collection := &Collection{
id: defaultCollectionID,