From e6bfa120645d77645a4c7a892019708f02dd5926 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 14 May 2025 14:16:22 +0800 Subject: [PATCH] enhance: [2.5][Restful] Add consistency level for query/get API (#41825) (#41830) Cherry-pick from master pr: #41825 Related to #41805 Signed-off-by: Congqi Xia --- .../proxy/httpserver/handler_v2.go | 43 +++++++++++++------ .../proxy/httpserver/handler_v2_test.go | 18 ++++++-- .../proxy/httpserver/request_v2.go | 30 +++++++------ 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 05550ddd1d..d809bd78d1 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -782,13 +782,22 @@ func matchCountRule(outputs []string) bool { func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*QueryReqV2) req := &milvuspb.QueryRequest{ - DbName: dbName, - CollectionName: httpReq.CollectionName, - Expr: httpReq.Filter, - OutputFields: httpReq.OutputFields, - PartitionNames: httpReq.PartitionNames, - QueryParams: []*commonpb.KeyValuePair{}, - UseDefaultConsistency: true, + DbName: dbName, + CollectionName: httpReq.CollectionName, + Expr: httpReq.Filter, + OutputFields: httpReq.OutputFields, + PartitionNames: httpReq.PartitionNames, + QueryParams: []*commonpb.KeyValuePair{}, + } + var err error + req.ConsistencyLevel, req.UseDefaultConsistency, err = convertConsistencyLevel(httpReq.ConsistencyLevel) + if err != nil { + log.Ctx(ctx).Warn("high level restful api, query with consistency_level invalid", zap.Error(err)) + HTTPAbortReturn(c, http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(err), + HTTPReturnMessage: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded, err:" + err.Error(), + }) + return nil, err } req.ExprTemplateValues = generateExpressionTemplate(httpReq.ExprParams) c.Set(ContextRequest, req) @@ -838,12 +847,20 @@ func (h *HandlersV2) get(ctx context.Context, c *gin.Context, anyReq any, dbName return nil, err } req := &milvuspb.QueryRequest{ - DbName: dbName, - CollectionName: httpReq.CollectionName, - OutputFields: httpReq.OutputFields, - PartitionNames: httpReq.PartitionNames, - Expr: filter, - UseDefaultConsistency: true, + DbName: dbName, + CollectionName: httpReq.CollectionName, + OutputFields: httpReq.OutputFields, + PartitionNames: httpReq.PartitionNames, + Expr: filter, + } + req.ConsistencyLevel, req.UseDefaultConsistency, err = convertConsistencyLevel(httpReq.ConsistencyLevel) + if err != nil { + log.Ctx(ctx).Warn("high level restful api, query with consistency_level invalid", zap.Error(err)) + HTTPAbortReturn(c, http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(err), + HTTPReturnMessage: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded, err:" + err.Error(), + }) + return nil, err } c.Set(ContextRequest, req) resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) { diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index 462da7ebe4..aa592c7b25 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -2064,7 +2064,7 @@ func TestDML(t *testing.T) { Schema: generateCollectionSchema(schemapb.DataType_Int64, false, true), ShardsNum: ShardNumDefault, Status: &StatusSuccess, - }, nil).Times(6) + }, nil).Times(7) mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{Status: commonErrorStatus}, nil).Times(4) mp.EXPECT().Query(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) { if matchCountRule(req.OutputFields) { @@ -2108,6 +2108,18 @@ func TestDML(t *testing.T) { path: QueryAction, requestBody: []byte(`{"collectionName": "book", "filter": "", "outputFields": ["count(*)"], "limit": 10}`), }) + queryTestCases = append(queryTestCases, requestBodyTestCase{ + path: QueryAction, + requestBody: []byte(`{"collectionName": "book", "filter": "", "outputFields": ["book_id", "word_count", "book_intro"], "limit": 10, "consistencyLevel": "AAA"}`), + errMsg: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded", + errCode: 1100, // ErrParameterInvalid + }) + queryTestCases = append(queryTestCases, requestBodyTestCase{ + path: GetAction, + requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8], "outputFields": ["book_id", "word_count", "book_intro"], "consistencyLevel": "AAA"}`), + errMsg: "consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded", + errCode: 1100, // ErrParameterInvalid + }) queryTestCases = append(queryTestCases, requestBodyTestCase{ path: InsertAction, requestBody: []byte(`{"collectionName": "book", "data": [{"book_id": 0, "word_count": 0, "book_intro": [0.11825, 0.6]}]}`), @@ -2136,11 +2148,11 @@ func TestDML(t *testing.T) { }) queryTestCases = append(queryTestCases, requestBodyTestCase{ path: GetAction, - requestBody: []byte(`{"collectionName": "book", "id" : [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`), + requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`), }) queryTestCases = append(queryTestCases, requestBodyTestCase{ path: GetAction, - requestBody: []byte(`{"collectionName": "book", "id" : [2, 4, 6, 8, 0], "outputFields": ["book_id", "word_count", "book_intro"]}`), + requestBody: []byte(`{"collectionName": "book", "id": [2, 4, 6, 8, "0"], "outputFields": ["book_id", "word_count", "book_intro"]}`), errMsg: "", errCode: 65535, }) diff --git a/internal/distributed/proxy/httpserver/request_v2.go b/internal/distributed/proxy/httpserver/request_v2.go index e4765bf027..4876bc9e89 100644 --- a/internal/distributed/proxy/httpserver/request_v2.go +++ b/internal/distributed/proxy/httpserver/request_v2.go @@ -209,25 +209,27 @@ type JobIDReq struct { func (req *JobIDReq) GetJobID() string { return req.JobID } type QueryReqV2 struct { - DbName string `json:"dbName"` - CollectionName string `json:"collectionName" binding:"required"` - PartitionNames []string `json:"partitionNames"` - OutputFields []string `json:"outputFields"` - Filter string `json:"filter"` - Limit int32 `json:"limit"` - Offset int32 `json:"offset"` - ExprParams map[string]interface{} `json:"exprParams"` + DbName string `json:"dbName"` + CollectionName string `json:"collectionName" binding:"required"` + PartitionNames []string `json:"partitionNames"` + OutputFields []string `json:"outputFields"` + Filter string `json:"filter"` + Limit int32 `json:"limit"` + Offset int32 `json:"offset"` + ExprParams map[string]interface{} `json:"exprParams"` + ConsistencyLevel string `json:"consistencyLevel"` } func (req *QueryReqV2) GetDbName() string { return req.DbName } type CollectionIDReq struct { - DbName string `json:"dbName"` - CollectionName string `json:"collectionName" binding:"required"` - PartitionName string `json:"partitionName"` - PartitionNames []string `json:"partitionNames"` - OutputFields []string `json:"outputFields"` - ID interface{} `json:"id" binding:"required"` + DbName string `json:"dbName"` + CollectionName string `json:"collectionName" binding:"required"` + PartitionName string `json:"partitionName"` + PartitionNames []string `json:"partitionNames"` + OutputFields []string `json:"outputFields"` + ID interface{} `json:"id" binding:"required"` + ConsistencyLevel string `json:"consistencyLevel"` } func (req *CollectionIDReq) GetDbName() string { return req.DbName }