diff --git a/internal/proxy/accesslog/info/grpc_info.go b/internal/proxy/accesslog/info/grpc_info.go index 9c67b9af3f..c07d13fc1c 100644 --- a/internal/proxy/accesslog/info/grpc_info.go +++ b/internal/proxy/accesslog/info/grpc_info.go @@ -50,6 +50,9 @@ type GrpcAccessInfo struct { grpcInfo *grpc.UnaryServerInfo start time.Time end time.Time + + // runtime set info + actualConsistencyLevel *commonpb.ConsistencyLevel } func NewGrpcAccessInfo(ctx context.Context, grpcInfo *grpc.UnaryServerInfo, req interface{}) *GrpcAccessInfo { @@ -299,6 +302,10 @@ func (i *GrpcAccessInfo) OutputFields() string { } func (i *GrpcAccessInfo) ConsistencyLevel() string { + // return actual consistency level if set + if i.actualConsistencyLevel != nil { + return i.actualConsistencyLevel.String() + } level, ok := requestutil.GetConsistencyLevelFromRequst(i.req) if ok { return level.String() @@ -357,3 +364,7 @@ func (i *GrpcAccessInfo) ClientRequestTime() string { return time.UnixMilli(unixmsec).Format(timeFormat) } + +func (i *GrpcAccessInfo) SetActualConsistencyLevel(acl commonpb.ConsistencyLevel) { + i.actualConsistencyLevel = &acl +} diff --git a/internal/proxy/accesslog/info/info.go b/internal/proxy/accesslog/info/info.go index 3f01df258c..3bfc1b9d89 100644 --- a/internal/proxy/accesslog/info/info.go +++ b/internal/proxy/accesslog/info/info.go @@ -16,6 +16,8 @@ package info +import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + const ( Unknown = "Unknown" timeFormat = "2006/01/02 15:04:05.000 -07:00" @@ -80,6 +82,7 @@ type AccessInfo interface { SearchParams() string QueryParams() string ClientRequestTime() string + SetActualConsistencyLevel(commonpb.ConsistencyLevel) } func Get(i AccessInfo, keys ...string) []any { diff --git a/internal/proxy/accesslog/info/restful_info.go b/internal/proxy/accesslog/info/restful_info.go index e3afadcbf1..ed44b70187 100644 --- a/internal/proxy/accesslog/info/restful_info.go +++ b/internal/proxy/accesslog/info/restful_info.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/pkg/v2/util/requestutil" ) @@ -41,6 +42,9 @@ type RestfulInfo struct { params *gin.LogFormatterParams start time.Time req interface{} + + // runtime set info + actualConsistencyLevel *commonpb.ConsistencyLevel } func NewRestfulInfo() *RestfulInfo { @@ -209,6 +213,10 @@ func (i *RestfulInfo) OutputFields() string { } func (i *RestfulInfo) ConsistencyLevel() string { + // return actual consistency level if set + if i.actualConsistencyLevel != nil { + return i.actualConsistencyLevel.String() + } level, ok := requestutil.GetConsistencyLevelFromRequst(i.req) if ok { return level.String() @@ -259,3 +267,7 @@ func (i *RestfulInfo) QueryParams() string { func (i *RestfulInfo) ClientRequestTime() string { return Unknown } + +func (i *RestfulInfo) SetActualConsistencyLevel(acl commonpb.ConsistencyLevel) { + i.actualConsistencyLevel = &acl +} diff --git a/internal/proxy/accesslog/util.go b/internal/proxy/accesslog/util.go index 6e8f4a656b..aa3bd71c0f 100644 --- a/internal/proxy/accesslog/util.go +++ b/internal/proxy/accesslog/util.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "google.golang.org/grpc" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proxy/accesslog/info" ) @@ -82,3 +83,13 @@ func timeFromName(filename, prefix, ext string) (time.Time, error) { ts := filename[len(prefix) : len(filename)-len(ext)] return time.Parse(timeNameFormat, ts) } + +func SetActualConsistencyLevel(ctx context.Context, acl commonpb.ConsistencyLevel) { + if ctx != nil { + v := ctx.Value(AccessKey{}) + info, ok := v.(info.AccessInfo) + if ok && info != nil { + info.SetActualConsistencyLevel(acl) + } + } +} diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 17f6637bf5..7135cb4b2c 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/parser/planparserv2" + "github.com/milvus-io/milvus/internal/proxy/accesslog" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/exprutil" "github.com/milvus-io/milvus/internal/util/reduce" @@ -525,6 +526,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error { } } + // update actual consistency level + accesslog.SetActualConsistencyLevel(ctx, consistencyLevel) + // use collection schema updated timestamp if it's greater than calculate guarantee timestamp // this make query view updated happens before new read request happens // see also schema change design diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index e6361994c1..0150193c60 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/parser/planparserv2" + "github.com/milvus-io/milvus/internal/proxy/accesslog" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/exprutil" "github.com/milvus-io/milvus/internal/util/function/embedding" @@ -241,6 +242,9 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } } + // update actual consistency level + accesslog.SetActualConsistencyLevel(ctx, consistencyLevel) + // use collection schema updated timestamp if it's greater than calculate guarantee timestamp // this make query view updated happens before new read request happens // see also schema change design