mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
enhance: [Restful] Add consistency level for query/get API (#41825)
Related to #41805 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
21d6d1669e
commit
fb612c765c
@ -782,13 +782,22 @@ func matchCountRule(outputs []string) bool {
|
|||||||
func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
|
func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
|
||||||
httpReq := anyReq.(*QueryReqV2)
|
httpReq := anyReq.(*QueryReqV2)
|
||||||
req := &milvuspb.QueryRequest{
|
req := &milvuspb.QueryRequest{
|
||||||
DbName: dbName,
|
DbName: dbName,
|
||||||
CollectionName: httpReq.CollectionName,
|
CollectionName: httpReq.CollectionName,
|
||||||
Expr: httpReq.Filter,
|
Expr: httpReq.Filter,
|
||||||
OutputFields: httpReq.OutputFields,
|
OutputFields: httpReq.OutputFields,
|
||||||
PartitionNames: httpReq.PartitionNames,
|
PartitionNames: httpReq.PartitionNames,
|
||||||
QueryParams: []*commonpb.KeyValuePair{},
|
QueryParams: []*commonpb.KeyValuePair{},
|
||||||
UseDefaultConsistency: true,
|
}
|
||||||
|
var err error
|
||||||
|
req.ConsistencyLevel, req.UseDefaultConsistency, err = convertConsistencyLevel(httpReq.ConsistencyLevel)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Warn("high level restful api, query with consistency_level invalid", zap.Error(err))
|
||||||
|
HTTPAbortReturn(c, http.StatusOK, gin.H{
|
||||||
|
HTTPReturnCode: merr.Code(err),
|
||||||
|
HTTPReturnMessage: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded, err:" + err.Error(),
|
||||||
|
})
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
req.ExprTemplateValues = generateExpressionTemplate(httpReq.ExprParams)
|
req.ExprTemplateValues = generateExpressionTemplate(httpReq.ExprParams)
|
||||||
c.Set(ContextRequest, req)
|
c.Set(ContextRequest, req)
|
||||||
@ -838,12 +847,20 @@ func (h *HandlersV2) get(ctx context.Context, c *gin.Context, anyReq any, dbName
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req := &milvuspb.QueryRequest{
|
req := &milvuspb.QueryRequest{
|
||||||
DbName: dbName,
|
DbName: dbName,
|
||||||
CollectionName: httpReq.CollectionName,
|
CollectionName: httpReq.CollectionName,
|
||||||
OutputFields: httpReq.OutputFields,
|
OutputFields: httpReq.OutputFields,
|
||||||
PartitionNames: httpReq.PartitionNames,
|
PartitionNames: httpReq.PartitionNames,
|
||||||
Expr: filter,
|
Expr: filter,
|
||||||
UseDefaultConsistency: true,
|
}
|
||||||
|
req.ConsistencyLevel, req.UseDefaultConsistency, err = convertConsistencyLevel(httpReq.ConsistencyLevel)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Warn("high level restful api, query with consistency_level invalid", zap.Error(err))
|
||||||
|
HTTPAbortReturn(c, http.StatusOK, gin.H{
|
||||||
|
HTTPReturnCode: merr.Code(err),
|
||||||
|
HTTPReturnMessage: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded, err:" + err.Error(),
|
||||||
|
})
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
c.Set(ContextRequest, req)
|
c.Set(ContextRequest, req)
|
||||||
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
|
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
|
||||||
|
|||||||
@ -2135,7 +2135,7 @@ func TestDML(t *testing.T) {
|
|||||||
Schema: generateCollectionSchema(schemapb.DataType_Int64, false, true),
|
Schema: generateCollectionSchema(schemapb.DataType_Int64, false, true),
|
||||||
ShardsNum: ShardNumDefault,
|
ShardsNum: ShardNumDefault,
|
||||||
Status: &StatusSuccess,
|
Status: &StatusSuccess,
|
||||||
}, nil).Times(6)
|
}, nil).Times(7)
|
||||||
mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{Status: commonErrorStatus}, nil).Times(4)
|
mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{Status: commonErrorStatus}, nil).Times(4)
|
||||||
mp.EXPECT().Query(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
|
mp.EXPECT().Query(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
|
||||||
if matchCountRule(req.OutputFields) {
|
if matchCountRule(req.OutputFields) {
|
||||||
@ -2179,6 +2179,18 @@ func TestDML(t *testing.T) {
|
|||||||
path: QueryAction,
|
path: QueryAction,
|
||||||
requestBody: []byte(`{"collectionName": "book", "filter": "", "outputFields": ["count(*)"], "limit": 10}`),
|
requestBody: []byte(`{"collectionName": "book", "filter": "", "outputFields": ["count(*)"], "limit": 10}`),
|
||||||
})
|
})
|
||||||
|
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
||||||
|
path: QueryAction,
|
||||||
|
requestBody: []byte(`{"collectionName": "book", "filter": "", "outputFields": ["book_id", "word_count", "book_intro"], "limit": 10, "consistencyLevel": "AAA"}`),
|
||||||
|
errMsg: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded",
|
||||||
|
errCode: 1100, // ErrParameterInvalid
|
||||||
|
})
|
||||||
|
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
||||||
|
path: GetAction,
|
||||||
|
requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8], "outputFields": ["book_id", "word_count", "book_intro"], "consistencyLevel": "AAA"}`),
|
||||||
|
errMsg: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded",
|
||||||
|
errCode: 1100, // ErrParameterInvalid
|
||||||
|
})
|
||||||
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
||||||
path: InsertAction,
|
path: InsertAction,
|
||||||
requestBody: []byte(`{"collectionName": "book", "data": [{"book_id": 0, "word_count": 0, "book_intro": [0.11825, 0.6]}]}`),
|
requestBody: []byte(`{"collectionName": "book", "data": [{"book_id": 0, "word_count": 0, "book_intro": [0.11825, 0.6]}]}`),
|
||||||
@ -2207,11 +2219,11 @@ func TestDML(t *testing.T) {
|
|||||||
})
|
})
|
||||||
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
||||||
path: GetAction,
|
path: GetAction,
|
||||||
requestBody: []byte(`{"collectionName": "book", "id" : [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`),
|
requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`),
|
||||||
})
|
})
|
||||||
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
queryTestCases = append(queryTestCases, requestBodyTestCase{
|
||||||
path: GetAction,
|
path: GetAction,
|
||||||
requestBody: []byte(`{"collectionName": "book", "id" : [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`),
|
requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8, "0"], "outputFields": ["book_id", "word_count", "book_intro"]}`),
|
||||||
errMsg: "",
|
errMsg: "",
|
||||||
errCode: 65535,
|
errCode: 65535,
|
||||||
})
|
})
|
||||||
|
|||||||
@ -209,25 +209,27 @@ type JobIDReq struct {
|
|||||||
func (req *JobIDReq) GetJobID() string { return req.JobID }
|
func (req *JobIDReq) GetJobID() string { return req.JobID }
|
||||||
|
|
||||||
type QueryReqV2 struct {
|
type QueryReqV2 struct {
|
||||||
DbName string `json:"dbName"`
|
DbName string `json:"dbName"`
|
||||||
CollectionName string `json:"collectionName" binding:"required"`
|
CollectionName string `json:"collectionName" binding:"required"`
|
||||||
PartitionNames []string `json:"partitionNames"`
|
PartitionNames []string `json:"partitionNames"`
|
||||||
OutputFields []string `json:"outputFields"`
|
OutputFields []string `json:"outputFields"`
|
||||||
Filter string `json:"filter"`
|
Filter string `json:"filter"`
|
||||||
Limit int32 `json:"limit"`
|
Limit int32 `json:"limit"`
|
||||||
Offset int32 `json:"offset"`
|
Offset int32 `json:"offset"`
|
||||||
ExprParams map[string]interface{} `json:"exprParams"`
|
ExprParams map[string]interface{} `json:"exprParams"`
|
||||||
|
ConsistencyLevel string `json:"consistencyLevel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *QueryReqV2) GetDbName() string { return req.DbName }
|
func (req *QueryReqV2) GetDbName() string { return req.DbName }
|
||||||
|
|
||||||
type CollectionIDReq struct {
|
type CollectionIDReq struct {
|
||||||
DbName string `json:"dbName"`
|
DbName string `json:"dbName"`
|
||||||
CollectionName string `json:"collectionName" binding:"required"`
|
CollectionName string `json:"collectionName" binding:"required"`
|
||||||
PartitionName string `json:"partitionName"`
|
PartitionName string `json:"partitionName"`
|
||||||
PartitionNames []string `json:"partitionNames"`
|
PartitionNames []string `json:"partitionNames"`
|
||||||
OutputFields []string `json:"outputFields"`
|
OutputFields []string `json:"outputFields"`
|
||||||
ID interface{} `json:"id" binding:"required"`
|
ID interface{} `json:"id" binding:"required"`
|
||||||
|
ConsistencyLevel string `json:"consistencyLevel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *CollectionIDReq) GetDbName() string { return req.DbName }
|
func (req *CollectionIDReq) GetDbName() string { return req.DbName }
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user