yihao.dai b3fc53015d
fix: Avoid memory copy in RESTful search and query (#37674)
Custom `jsonRender` that encodes JSON data directly into the response
stream, it uses less memory since it does not buffer the entire JSON
structure before sending it, unlike `c.JSON` in `HTTPReturn`, which
serializes the JSON fully in memory before writing it to the response.
Benchmark testing shows that, using the custom render incurs no
performance loss and reduces memory consumption by nearly 50%.

issue: https://github.com/milvus-io/milvus/issues/37671

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2024-11-19 17:24:31 +08:00

980 lines
38 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpserver
import (
"context"
"net/http"
"strconv"
"github.com/cockroachdb/errors"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/tidwall/gjson"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/requestutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var RestRequestInterceptorErr = errors.New("interceptor error placeholder")
func checkAuthorization(ctx context.Context, c *gin.Context, req interface{}) error {
username, ok := c.Get(ContextUsername)
if !ok || username.(string) == "" {
HTTPReturn(c, http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
return RestRequestInterceptorErr
}
_, authErr := proxy.PrivilegeInterceptor(ctx, req)
if authErr != nil {
HTTPReturn(c, http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()})
return RestRequestInterceptorErr
}
return nil
}
type RestRequestInterceptor func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error)
// HandlersV1 handles http requests
type HandlersV1 struct {
proxy types.ProxyComponent
interceptors []RestRequestInterceptor
}
// NewHandlers creates a new HandlersV1
func NewHandlersV1(proxyComponent types.ProxyComponent) *HandlersV1 {
h := &HandlersV1{
proxy: proxyComponent,
interceptors: []RestRequestInterceptor{},
}
if proxy.Params.CommonCfg.AuthorizationEnabled.GetAsBool() {
h.interceptors = append(h.interceptors,
// authorization
func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) {
err := checkAuthorization(ctx, ginCtx, req)
if err != nil {
return nil, err
}
return handler(ctx, req)
})
}
h.interceptors = append(h.interceptors,
// check database
func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) {
value, ok := requestutil.GetDbNameFromRequest(req)
if !ok {
return handler(ctx, req)
}
err := h.checkDatabase(ctx, ginCtx, value.(string))
if err != nil {
return nil, err
}
return handler(ctx, req)
})
h.interceptors = append(h.interceptors,
// trace request
func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) {
return proxy.TraceLogInterceptor(ctx, req, &grpc.UnaryServerInfo{
FullMethod: ginCtx.Request.URL.Path,
}, handler)
})
return h
}
func (h *HandlersV1) checkDatabase(ctx context.Context, c *gin.Context, dbName string) error {
if dbName == DefaultDbName {
return nil
}
if proxy.CheckDatabase(ctx, dbName) {
return nil
}
response, err := h.proxy.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})
if err == nil {
err = merr.Error(response.GetStatus())
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return RestRequestInterceptorErr
}
for _, db := range response.DbNames {
if db == dbName {
return nil
}
}
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound),
HTTPReturnMessage: merr.ErrDatabaseNotFound.Error() + ", database: " + dbName,
})
return RestRequestInterceptorErr
}
func (h *HandlersV1) describeCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName)
if err == nil {
return collSchema.CollectionSchema, nil
}
req := milvuspb.DescribeCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
response, err := h.proxy.DescribeCollection(ctx, &req)
if err == nil {
err = merr.Error(response.GetStatus())
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, err
}
primaryField, ok := getPrimaryField(response.Schema)
if ok && primaryField.AutoID && !primaryField.AutoID {
log.Warn("primary filed autoID VS schema autoID", zap.String("collectionName", collectionName), zap.Bool("primary Field", primaryField.AutoID), zap.Bool("schema", primaryField.AutoID))
response.Schema.AutoID = EnableAutoID
}
return response.Schema, nil
}
func (h *HandlersV1) hasCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (bool, error) {
req := milvuspb.HasCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
response, err := h.proxy.HasCollection(ctx, &req)
if err == nil {
err = merr.Error(response.GetStatus())
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return false, err
}
return response.Value, nil
}
func (h *HandlersV1) RegisterRoutesToV1(router gin.IRouter) {
router.GET(VectorCollectionsPath, h.listCollections)
router.POST(VectorCollectionsCreatePath, h.createCollection)
router.GET(VectorCollectionsDescribePath, h.getCollectionDetails)
router.POST(VectorCollectionsDropPath, h.dropCollection)
router.POST(VectorQueryPath, h.query)
router.POST(VectorGetPath, h.get)
router.POST(VectorDeletePath, h.delete)
router.POST(VectorInsertPath, h.insert)
router.POST(VectorUpsertPath, h.upsert)
router.POST(VectorSearchPath, h.search)
}
func (h *HandlersV1) executeRestRequestInterceptor(ctx context.Context,
ginCtx *gin.Context,
req any, handler func(reqCtx context.Context, req any) (any, error),
) (any, error) {
f := handler
for i := len(h.interceptors) - 1; i >= 0; i-- {
f = func(j int, handlerFunc func(reqCtx context.Context, req any) (any, error)) func(reqCtx context.Context, req any) (any, error) {
return func(reqCtx context.Context, req any) (any, error) {
return h.interceptors[j](reqCtx, ginCtx, req, handlerFunc)
}
}(i, f)
}
return f(ctx, req)
}
func (h *HandlersV1) listCollections(c *gin.Context) {
dbName := c.DefaultQuery(HTTPDbName, DefaultDbName)
req := &milvuspb.ShowCollectionsRequest{
DbName: dbName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
resp, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.ShowCollections(reqCtx, req.(*milvuspb.ShowCollectionsRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(resp.(*milvuspb.ShowCollectionsResponse).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
response := resp.(*milvuspb.ShowCollectionsResponse)
var collections []string
if response.CollectionNames != nil {
collections = response.CollectionNames
} else {
collections = []string{}
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections})
}
func (h *HandlersV1) createCollection(c *gin.Context) {
httpReq := CreateCollectionReq{
DbName: DefaultDbName,
MetricType: metric.L2,
PrimaryField: DefaultPrimaryFieldName,
VectorField: DefaultVectorFieldName,
EnableDynamicField: EnableDynamic,
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of create collection is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" || httpReq.Dimension == 0 {
log.Warn("high level restful api, create collection require parameters: [collectionName, dimension], but miss", zap.Any("request", httpReq))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, dimension]",
})
return
}
req := &milvuspb.CreateCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
ShardsNum: ShardNumDefault,
ConsistencyLevel: commonpb.ConsistencyLevel_Bounded,
}
c.Set(ContextRequest, req)
schema, err := proto.Marshal(&schemapb.CollectionSchema{
Name: httpReq.CollectionName,
Description: httpReq.Description,
AutoID: EnableAutoID,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: httpReq.PrimaryField,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: EnableAutoID,
}, {
FieldID: common.StartOfUserFieldID + 1,
Name: httpReq.VectorField,
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: Dim,
Value: strconv.FormatInt(int64(httpReq.Dimension), 10),
},
},
AutoID: DisableAutoID,
},
},
EnableDynamicField: httpReq.EnableDynamicField,
})
if err != nil {
log.Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema),
HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error() + ", error: " + err.Error(),
})
return
}
req.Schema = schema
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.CreateCollection(reqCtx, req.(*milvuspb.CreateCollectionRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*commonpb.Status))
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
statusResponse, err := h.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
FieldName: httpReq.VectorField,
IndexName: DefaultIndexName,
ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: httpReq.MetricType}},
})
if err == nil {
err = merr.Error(statusResponse)
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
statusResponse, err = h.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
})
if err == nil {
err = merr.Error(statusResponse)
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
func (h *HandlersV1) getCollectionDetails(c *gin.Context) {
collectionName := c.Query(HTTPCollectionName)
if collectionName == "" {
log.Warn("high level restful api, desc collection require parameter: [collectionName], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]",
})
return
}
dbName := c.DefaultQuery(HTTPDbName, DefaultDbName)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), dbName)
req := &milvuspb.DescribeCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
c.Set(ContextRequest, req)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest))
})
if err == nil {
err = merr.Error(response.(*milvuspb.DescribeCollectionResponse).GetStatus())
}
if err != nil {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
coll := response.(*milvuspb.DescribeCollectionResponse)
primaryField, ok := getPrimaryField(coll.Schema)
if ok && primaryField.AutoID && !primaryField.AutoID {
log.Warn("primary filed autoID VS schema autoID", zap.String("collectionName", collectionName), zap.Bool("primary Field", primaryField.AutoID), zap.Bool("schema", primaryField.AutoID))
coll.Schema.AutoID = EnableAutoID
}
stateResp, err := h.proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{
DbName: dbName,
CollectionName: collectionName,
})
collLoadState := ""
if err == nil {
err = merr.Error(stateResp.GetStatus())
}
if err != nil {
log.Warn("get collection load state fail",
zap.String("collection", collectionName),
zap.Error(err),
)
} else {
collLoadState = stateResp.State.String()
}
vectorField := ""
for _, field := range coll.Schema.Fields {
if typeutil.IsVectorType(field.DataType) {
vectorField = field.Name
break
}
}
indexResp, err := h.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionName,
FieldName: vectorField,
})
if err == nil {
err = merr.Error(indexResp.GetStatus())
}
var indexDesc []gin.H
if err != nil {
indexDesc = []gin.H{}
log.Warn("get indexes description fail",
zap.String("collection", collectionName),
zap.String("vectorField", vectorField),
zap.Error(err),
)
} else {
indexDesc = printIndexes(indexResp.IndexDescriptions)
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{
HTTPCollectionName: coll.CollectionName,
HTTPReturnDescription: coll.Schema.Description,
"fields": printFields(coll.Schema.Fields),
"indexes": indexDesc,
"load": collLoadState,
"shardsNum": coll.ShardsNum,
"enableDynamicField": coll.Schema.EnableDynamicField,
}})
}
func (h *HandlersV1) dropCollection(c *gin.Context) {
httpReq := DropCollectionReq{
DbName: DefaultDbName,
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of drop collection is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" {
log.Warn("high level restful api, drop collection require parameter: [collectionName], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]",
})
return
}
req := &milvuspb.DropCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
has, err := h.hasCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil {
return nil, RestRequestInterceptorErr
}
if !has {
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCollectionNotFound),
HTTPReturnMessage: merr.ErrCollectionNotFound.Error() + ", database: " + httpReq.DbName + ", collection: " + httpReq.CollectionName,
})
return nil, RestRequestInterceptorErr
}
return h.proxy.DropCollection(reqCtx, req.(*milvuspb.DropCollectionRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*commonpb.Status))
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
}
func (h *HandlersV1) query(c *gin.Context) {
httpReq := QueryReq{
DbName: DefaultDbName,
Limit: 100,
OutputFields: []string{DefaultOutputFields},
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of query is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" || httpReq.Filter == "" {
log.Warn("high level restful api, query require parameter: [collectionName, filter], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, filter]",
})
return
}
req := &milvuspb.QueryRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
Expr: httpReq.Filter,
OutputFields: httpReq.OutputFields,
GuaranteeTimestamp: BoundedTimestamp,
QueryParams: []*commonpb.KeyValuePair{},
}
c.Set(ContextRequest, req)
if httpReq.Offset > 0 {
req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)})
}
if httpReq.Limit > 0 {
req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)})
}
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.QueryResults).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
queryResp := response.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with query result", zap.Any("response", response), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturnStream(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}
func (h *HandlersV1) get(c *gin.Context) {
httpReq := GetReq{
DbName: DefaultDbName,
OutputFields: []string{DefaultOutputFields},
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of get is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" || httpReq.ID == nil {
log.Warn("high level restful api, get require parameter: [collectionName, id], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id]",
})
return
}
req := &milvuspb.QueryRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
OutputFields: httpReq.OutputFields,
GuaranteeTimestamp: BoundedTimestamp,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
queryReq := req.(*milvuspb.QueryRequest)
queryReq.Expr = filter
return h.proxy.Query(reqCtx, queryReq)
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.QueryResults).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
queryResp := response.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with get result", zap.Any("response", response), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturnStream(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}
func (h *HandlersV1) delete(c *gin.Context) {
httpReq := DeleteReq{
DbName: DefaultDbName,
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of delete is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" || (httpReq.ID == nil && httpReq.Filter == "") {
log.Warn("high level restful api, delete require parameter: [collectionName, id/filter], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id/filter]",
})
return
}
req := &milvuspb.DeleteRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
deleteReq := req.(*milvuspb.DeleteRequest)
deleteReq.Expr = httpReq.Filter
if deleteReq.Expr == "" {
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
deleteReq.Expr = filter
}
return h.proxy.Delete(ctx, deleteReq)
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
}
func (h *HandlersV1) insert(c *gin.Context) {
httpReq := InsertReq{
DbName: DefaultDbName,
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
singleInsertReq := SingleInsertReq{
DbName: DefaultDbName,
}
if err = c.ShouldBindBodyWith(&singleInsertReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of insert is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
httpReq.DbName = singleInsertReq.DbName
httpReq.CollectionName = singleInsertReq.CollectionName
httpReq.Data = []map[string]interface{}{singleInsertReq.Data}
}
if httpReq.CollectionName == "" || httpReq.Data == nil {
log.Warn("high level restful api, insert require parameter: [collectionName, data], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]",
})
return
}
req := &milvuspb.InsertRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
NumRows: uint32(len(httpReq.Data)),
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data, _ = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
insertReq := req.(*milvuspb.InsertRequest)
insertReq.FieldsData, err = anyToColumns(httpReq.Data, nil, collSchema, true)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
return h.proxy.Insert(ctx, insertReq)
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
insertResp := response.(*milvuspb.MutationResult)
switch insertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": formatInt64(insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
}
case *schemapb.IDs_StrId:
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
default:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
}
}
}
func (h *HandlersV1) upsert(c *gin.Context) {
httpReq := UpsertReq{
DbName: DefaultDbName,
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
singleUpsertReq := SingleUpsertReq{
DbName: DefaultDbName,
}
if err = c.ShouldBindBodyWith(&singleUpsertReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of upsert is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
httpReq.DbName = singleUpsertReq.DbName
httpReq.CollectionName = singleUpsertReq.CollectionName
httpReq.Data = []map[string]interface{}{singleUpsertReq.Data}
}
if httpReq.CollectionName == "" || httpReq.Data == nil {
log.Warn("high level restful api, upsert require parameter: [collectionName, data], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]",
})
return
}
req := &milvuspb.UpsertRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
NumRows: uint32(len(httpReq.Data)),
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
for _, fieldSchema := range collSchema.Fields {
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID {
err := merr.WrapErrParameterInvalid("autoID: false", "autoID: true", "cannot upsert an autoID collection")
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, RestRequestInterceptorErr
}
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data, _ = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
upsertReq := req.(*milvuspb.UpsertRequest)
upsertReq.FieldsData, err = anyToColumns(httpReq.Data, nil, collSchema, false)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
return h.proxy.Upsert(ctx, upsertReq)
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
upsertResp := response.(*milvuspb.MutationResult)
switch upsertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": formatInt64(upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
}
case *schemapb.IDs_StrId:
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
default:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
}
}
}
func (h *HandlersV1) search(c *gin.Context) {
httpReq := SearchReq{
DbName: DefaultDbName,
Limit: 100,
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of search is incorrect", zap.Any("request", httpReq), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" || httpReq.Vector == nil {
log.Warn("high level restful api, search require parameter: [collectionName, vector], but miss")
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, vector]",
})
return
}
req := &milvuspb.SearchRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
Dsl: httpReq.Filter,
PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}),
DslType: commonpb.DslType_BoolExprV1,
OutputFields: httpReq.OutputFields,
GuaranteeTimestamp: BoundedTimestamp,
Nq: int64(1),
}
c.Set(ContextRequest, req)
params := map[string]interface{}{ // auto generated mapping
"level": int(commonpb.ConsistencyLevel_Bounded),
}
if httpReq.Params != nil {
radius, radiusOk := httpReq.Params[ParamRadius]
rangeFilter, rangeFilterOk := httpReq.Params[ParamRangeFilter]
if rangeFilterOk {
if !radiusOk {
log.Warn("high level restful api, search params invalid, because only " + ParamRangeFilter)
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: invalid search params",
})
return
}
params[ParamRangeFilter] = rangeFilter
}
if radiusOk {
params[ParamRadius] = radius
}
}
bs, _ := json.Marshal(params)
req.SearchParams = []*commonpb.KeyValuePair{
{Key: common.TopKKey, Value: strconv.FormatInt(int64(httpReq.Limit), 10)},
{Key: Params, Value: string(bs)},
{Key: ParamRoundDecimal, Value: "-1"},
{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)},
}
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.Search(ctx, req.(*milvuspb.SearchRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*milvuspb.SearchResults).GetStatus())
}
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
searchResp := response.(*milvuspb.SearchResults)
if searchResp.Results.TopK == int64(0) {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []interface{}{}})
} else {
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(searchResp.Results.TopK, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturnStream(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}
}