enhance: returning collection metadata from cache (#42823) (#43911)

See #43187
pr: #42823

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-08-26 14:23:54 +08:00 committed by GitHub
parent 6c29689ca2
commit 8821743c17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 412 additions and 66 deletions

View File

@ -289,6 +289,11 @@ proxy:
ddlConcurrency: 16 # The concurrent execution number of DDL at proxy.
dclConcurrency: 16 # The concurrent execution number of DCL at proxy.
mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection
# maximum number of result entries, typically Nq * TopK * GroupSize.
# It costs additional memory and time to process a large number of result entries.
# If the number of result entries exceeds this limit, the search will be rejected.
# Disabled if the value is less or equal to 0.
maxResultEntries: -1
accessLog:
enable: false # Whether to enable the access log feature.
minioEnable: false # Whether to upload local access log files to MinIO. This parameter can be specified when proxy.accessLog.filename is not empty.

View File

@ -961,73 +961,13 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
interceptor, err := NewInterceptor[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse](node, "DescribeCollection")
if err != nil {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeCollection")
defer sp.End()
method := "DescribeCollection"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
dct := &describeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
rootCoord: node.rootCoord,
}
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
log.Debug("DescribeCollection received")
if err := node.sched.ddQueue.Enqueue(dct); err != nil {
log.Warn("DescribeCollection failed to enqueue",
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
log.Debug("DescribeCollection enqueued",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
if err := dct.WaitToFinish(); err != nil {
log.Warn("DescribeCollection failed to WaitToFinish",
zap.Error(err),
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
log.Debug("DescribeCollection done",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return dct.result, nil
return interceptor.Call(ctx, request)
}
// GetStatistics get the statistics, such as `num_rows`.

View File

@ -1204,6 +1204,92 @@ func TestProxyDescribeDatabase(t *testing.T) {
})
}
func TestProxyDescribeCollection(t *testing.T) {
paramtable.Init()
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
ctx := context.Background()
rootCoord := mocks.NewMockRootCoordClient(t)
queryCoord := mocks.NewMockQueryCoordClient(t)
rootCoord.On("DescribeCollection", mock.Anything, mock.MatchedBy(func(req *milvuspb.DescribeCollectionRequest) bool {
return req.DbName == "test_1" && req.CollectionName == "test_collection"
})).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
CollectionID: 1,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{Name: "pk", DataType: schemapb.DataType_Int64},
},
},
}, nil).Maybe()
rootCoord.On("ShowPartitions", mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Success(),
PartitionNames: []string{"default"},
CreatedTimestamps: []uint64{1},
CreatedUtcTimestamps: []uint64{1},
PartitionIDs: []int64{1},
}, nil).Maybe()
rootCoord.On("ShowLoadCollections", mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: merr.Success(),
}, nil).Maybe()
rootCoord.On("DescribeCollection", mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
queryCoord.On("ShowCollections", mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: merr.Success(),
}, nil).Maybe()
var err error
globalMetaCache, err = NewMetaCache(rootCoord, queryCoord, nil)
assert.NoError(t, err)
t.Run("not healthy", func(t *testing.T) {
node.UpdateStateCode(commonpb.StateCode_Abnormal)
defer node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("collection not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionName: "test_not_exists",
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=test_1][collection=test_not_exists]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("collection id not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionID: 1000,
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=test_1][collection=]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("db not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "db_not_exists",
CollectionName: "test_collection",
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=db_not_exists][collection=test_collection]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("describe collection ok", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionName: "test_collection",
})
assert.NoError(t, err)
assert.Empty(t, resp.GetStatus().GetReason())
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestProxy_AllocTimestamp(t *testing.T) {
t.Run("proxy unhealthy", func(t *testing.T) {
node := &Proxy{}

View File

@ -105,6 +105,12 @@ type collectionInfo struct {
replicateID string
updateTimestamp uint64
collectionTTL uint64
numPartitions int64
vChannels []string
pChannels []string
shardsNum int32
aliases []string
properties []*commonpb.KeyValuePair
}
type databaseInfo struct {
@ -477,6 +483,12 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
vChannels: collection.VirtualChannelNames,
pChannels: collection.PhysicalChannelNames,
numPartitions: collection.NumPartitions,
shardsNum: collection.ShardsNum,
aliases: collection.Aliases,
properties: collection.Properties,
}, nil
}
_, dbOk := m.collInfo[database]
@ -496,12 +508,18 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
vChannels: collection.VirtualChannelNames,
pChannels: collection.PhysicalChannelNames,
numPartitions: collection.NumPartitions,
shardsNum: collection.ShardsNum,
aliases: collection.Aliases,
properties: collection.Properties,
}
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID),
zap.Strings("partition", partitions.PartitionNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", collection.GetRequestTime()),
zap.Uint64("version", collection.GetRequestTime()), zap.Any("aliases", collection.Aliases),
)
m.collectionCacheVersion[collection.GetCollectionID()] = collection.GetRequestTime()

View File

@ -0,0 +1,232 @@
package proxy
import (
"context"
"fmt"
"strconv"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type Request interface {
GetDbName() string
GetCollectionName() string
}
type Response interface {
GetStatus() *commonpb.Status
}
type ServiceInterceptor[Req Request, Resp Response] interface {
Call(ctx context.Context, request Req) (Resp, error)
}
func NewInterceptor[Req Request, Resp Response](proxy *Proxy, method string) (*InterceptorImpl[Req, Resp], error) {
var provider milvuspb.MilvusServiceServer
cached := paramtable.Get().ProxyCfg.EnableCachedServiceProvider.GetAsBool()
if cached {
provider = &CachedProxyServiceProvider{Proxy: proxy}
} else {
provider = &RemoteProxyServiceProvider{Proxy: proxy}
}
switch method {
case "DescribeCollection":
interceptor := &InterceptorImpl[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse]{
proxy: proxy,
method: method,
onCall: provider.DescribeCollection,
onError: func(err error) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
},
}
return interface{}(interceptor).(*InterceptorImpl[Req, Resp]), nil
default:
return nil, fmt.Errorf("method %s not supported", method)
}
}
type InterceptorImpl[Req Request, Resp Response] struct {
proxy *Proxy
method string
onCall func(ctx context.Context, request Req) (Resp, error)
onError func(err error) (Resp, error)
}
func (i *InterceptorImpl[Req, Resp]) Call(ctx context.Context, request Req,
) (Resp, error) {
if err := merr.CheckHealthy(i.proxy.GetStateCode()); err != nil {
return i.onError(err)
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, fmt.Sprintf("Proxy-%s", i.method))
defer sp.End()
tr := timerecord.NewTimeRecorder(i.method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method,
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
resp, err := i.onCall(ctx, request)
if err != nil {
return i.onError(err)
}
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return resp, err
}
type CachedProxyServiceProvider struct {
*Proxy
}
func (node *CachedProxyServiceProvider) DescribeCollection(ctx context.Context,
request *milvuspb.DescribeCollectionRequest,
) (resp *milvuspb.DescribeCollectionResponse, err error) {
resp = &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
CollectionName: request.CollectionName,
DbName: request.DbName,
}
wrapErrorStatus := func(err error) *commonpb.Status {
status := &commonpb.Status{}
if errors.Is(err, merr.ErrCollectionNotFound) {
// nolint
status.ErrorCode = commonpb.ErrorCode_CollectionNotExists
// nolint
status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", request.DbName, request.CollectionName)
status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"}
} else {
status = merr.Status(err)
}
return status
}
if request.CollectionName == "" && request.CollectionID > 0 {
collName, err := globalMetaCache.GetCollectionName(ctx, request.DbName, request.CollectionID)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
request.CollectionName = collName
}
// validate collection name, ref describeCollectionTask.PreExecute
if err = validateCollectionName(request.CollectionName); err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
request.CollectionID, err = globalMetaCache.GetCollectionID(ctx, request.DbName, request.CollectionName)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
c, err := globalMetaCache.GetCollectionInfo(ctx, request.DbName, request.CollectionName, request.CollectionID)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
// skip dynamic fields, see describeCollectionTask.Execute
resp.Schema = &schemapb.CollectionSchema{
Name: c.schema.CollectionSchema.Name,
Description: c.schema.CollectionSchema.Description,
AutoID: c.schema.CollectionSchema.AutoID,
Fields: lo.Filter(c.schema.CollectionSchema.Fields, func(field *schemapb.FieldSchema, _ int) bool {
return !field.IsDynamic
}),
EnableDynamicField: c.schema.CollectionSchema.EnableDynamicField,
Properties: c.schema.CollectionSchema.Properties,
Functions: c.schema.CollectionSchema.Functions,
DbName: c.schema.CollectionSchema.DbName,
}
resp.CollectionID = c.collID
resp.UpdateTimestamp = c.updateTimestamp
resp.UpdateTimestampStr = fmt.Sprintf("%d", c.updateTimestamp)
resp.CreatedTimestamp = c.createdTimestamp
resp.CreatedUtcTimestamp = c.createdUtcTimestamp
resp.ConsistencyLevel = c.consistencyLevel
resp.VirtualChannelNames = c.vChannels
resp.PhysicalChannelNames = c.pChannels
resp.NumPartitions = c.numPartitions
resp.ShardsNum = c.shardsNum
resp.Aliases = c.aliases
resp.Properties = c.properties
return resp, nil
}
type RemoteProxyServiceProvider struct {
*Proxy
}
func (node *RemoteProxyServiceProvider) DescribeCollection(ctx context.Context,
request *milvuspb.DescribeCollectionRequest,
) (*milvuspb.DescribeCollectionResponse, error) {
dct := &describeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
rootCoord: node.rootCoord,
}
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
method := "DescribeCollection"
log.Debug("DescribeCollection received")
if err := node.sched.ddQueue.Enqueue(dct); err != nil {
log.Warn("DescribeCollection failed to enqueue",
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return nil, err
}
log.Debug("DescribeCollection enqueued",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
if err := dct.WaitToFinish(); err != nil {
log.Warn("DescribeCollection failed to WaitToFinish",
zap.Error(err),
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return nil, err
}
log.Debug("DescribeCollection done",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
)
return dct.result, nil
}

View File

@ -0,0 +1,36 @@
package proxy
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
func TestNewInterceptor(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
node := &Proxy{
rootCoord: rootCoord,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.UpdateStateCode(commonpb.StateCode_Healthy)
rootCoord.On("DescribeCollection", mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
var err error
globalMetaCache, err = NewMetaCache(rootCoord, nil, nil)
assert.NoError(t, err)
interceptor, err := NewInterceptor[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse](node, "DescribeCollection")
assert.NoError(t, err)
resp, err := interceptor.Call(context.Background(), &milvuspb.DescribeCollectionRequest{
DbName: "test",
CollectionName: "test",
})
assert.NoError(t, err)
assert.Equal(t, "can't find collection[database=test][collection=test]", resp.Status.Reason)
}

View File

@ -1501,6 +1501,9 @@ type proxyConfig struct {
SkipAutoIDCheck ParamItem `refreshable:"true"`
SkipPartitionKeyCheck ParamItem `refreshable:"true"`
MaxVarCharLength ParamItem `refreshable:"false"`
MaxTextLength ParamItem `refreshable:"false"`
MaxResultEntries ParamItem `refreshable:"true"`
EnableCachedServiceProvider ParamItem `refreshable:"true"`
AccessLog AccessLogConfig
@ -1914,6 +1917,33 @@ please adjust in embedded Milvus: false`,
}
p.MaxVarCharLength.Init(base.mgr)
p.MaxTextLength = ParamItem{
Key: "proxy.maxTextLength",
Version: "2.6.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024), // 2M
Doc: "maximum number of characters for a row of the text field",
}
p.MaxTextLength.Init(base.mgr)
p.MaxResultEntries = ParamItem{
Key: "proxy.maxResultEntries",
Version: "2.6.0",
DefaultValue: "-1",
Doc: `maximum number of result entries, typically Nq * TopK * GroupSize.
It costs additional memory and time to process a large number of result entries.
If the number of result entries exceeds this limit, the search will be rejected.
Disabled if the value is less or equal to 0.`,
Export: true,
}
p.MaxResultEntries.Init(base.mgr)
p.EnableCachedServiceProvider = ParamItem{
Key: "proxy.enableCachedServiceProvider",
Version: "2.6.0",
DefaultValue: "true",
Doc: "enable cached service provider",
}
p.EnableCachedServiceProvider.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "proxy.gracefulStopTimeout",
Version: "2.3.7",

View File

@ -1092,7 +1092,6 @@ class TestUtilityBase(TestcaseBase):
new_collection_name = cf.gen_unique_str(prefix + "new")
alias = cf.gen_unique_str(prefix + "alias")
self.utility_wrap.create_alias(old_collection_name, alias)
collection_alias = collection_w.aliases
self.utility_wrap.rename_collection(old_collection_name, new_collection_name)
collection_w = self.init_collection_wrap(name=new_collection_name,
check_task=CheckTasks.check_collection_property,
@ -1101,7 +1100,7 @@ class TestUtilityBase(TestcaseBase):
collections = self.utility_wrap.list_collections()[0]
assert new_collection_name in collections
assert old_collection_name not in collections
assert collection_alias == collection_w.aliases
assert [alias] == collection_w.aliases
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collections(self):