enhance: returning collection metadata from cache (#42823)

See #43187

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-07-14 10:54:50 +08:00 committed by GitHub
parent 21a96bc903
commit 07894b37b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 384 additions and 68 deletions

View File

@ -12,9 +12,7 @@ require (
github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.17.1
go.opentelemetry.io/otel v1.28.0
go.uber.org/atomic v1.11.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
)
@ -89,6 +87,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
@ -99,6 +98,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect

View File

@ -959,73 +959,13 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
interceptor, err := NewInterceptor[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse](node, "DescribeCollection")
if err != nil {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeCollection")
defer sp.End()
method := "DescribeCollection"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
dct := &describeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
mixCoord: node.mixCoord,
}
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
log.Debug("DescribeCollection received")
if err := node.sched.ddQueue.Enqueue(dct); err != nil {
log.Warn("DescribeCollection failed to enqueue",
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
log.Debug("DescribeCollection enqueued",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
if err := dct.WaitToFinish(); err != nil {
log.Warn("DescribeCollection failed to WaitToFinish",
zap.Error(err),
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
log.Debug("DescribeCollection done",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return dct.result, nil
return interceptor.Call(ctx, request)
}
// AddCollectionField add a field to collection

View File

@ -1149,6 +1149,87 @@ func TestProxyDescribeDatabase(t *testing.T) {
})
}
func TestProxyDescribeCollection(t *testing.T) {
paramtable.Init()
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
ctx := context.Background()
mixCoord := mocks.NewMockMixCoordClient(t)
mixCoord.On("DescribeCollection", mock.Anything, mock.MatchedBy(func(req *milvuspb.DescribeCollectionRequest) bool {
return req.DbName == "test_1" && req.CollectionName == "test_collection"
})).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
CollectionID: 1,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{Name: "pk", DataType: schemapb.DataType_Int64},
},
},
}, nil).Maybe()
mixCoord.On("ShowPartitions", mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Success(),
PartitionNames: []string{"default"},
CreatedTimestamps: []uint64{1},
CreatedUtcTimestamps: []uint64{1},
PartitionIDs: []int64{1},
}, nil).Maybe()
mixCoord.On("ShowLoadCollections", mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: merr.Success(),
}, nil).Maybe()
mixCoord.On("DescribeCollection", mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
var err error
globalMetaCache, err = NewMetaCache(mixCoord, nil)
assert.NoError(t, err)
t.Run("not healthy", func(t *testing.T) {
node.UpdateStateCode(commonpb.StateCode_Abnormal)
defer node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("collection not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionName: "test_not_exists",
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=test_1][collection=test_not_exists]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("collection id not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionID: 1000,
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=test_1][collection=]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("db not exists", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "db_not_exists",
CollectionName: "test_collection",
})
assert.NoError(t, err)
assert.Contains(t, resp.GetStatus().GetReason(), "can't find collection[database=db_not_exists][collection=test_collection]")
assert.Equal(t, commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode())
})
t.Run("describe collection ok", func(t *testing.T) {
resp, err := node.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
DbName: "test_1",
CollectionName: "test_collection",
})
assert.NoError(t, err)
assert.Empty(t, resp.GetStatus().GetReason())
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestProxy_AllocTimestamp(t *testing.T) {
t.Run("proxy unhealthy", func(t *testing.T) {
node := &Proxy{}

View File

@ -106,6 +106,12 @@ type collectionInfo struct {
replicateID string
updateTimestamp uint64
collectionTTL uint64
numPartitions int64
vChannels []string
pChannels []string
shardsNum int32
aliases []string
properties []*commonpb.KeyValuePair
}
type databaseInfo struct {
@ -472,6 +478,12 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
vChannels: collection.VirtualChannelNames,
pChannels: collection.PhysicalChannelNames,
numPartitions: collection.NumPartitions,
shardsNum: collection.ShardsNum,
aliases: collection.Aliases,
properties: collection.Properties,
}, nil
}
_, dbOk := m.collInfo[database]
@ -491,12 +503,18 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
vChannels: collection.VirtualChannelNames,
pChannels: collection.PhysicalChannelNames,
numPartitions: collection.NumPartitions,
shardsNum: collection.ShardsNum,
aliases: collection.Aliases,
properties: collection.Properties,
}
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID),
zap.Strings("partition", partitions.PartitionNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", collection.GetRequestTime()),
zap.Uint64("version", collection.GetRequestTime()), zap.Any("aliases", collection.Aliases),
)
m.collectionCacheVersion[collection.GetCollectionID()] = collection.GetRequestTime()

View File

@ -0,0 +1,232 @@
package proxy
import (
"context"
"fmt"
"strconv"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type Request interface {
GetDbName() string
GetCollectionName() string
}
type Response interface {
GetStatus() *commonpb.Status
}
type ServiceInterceptor[Req Request, Resp Response] interface {
Call(ctx context.Context, request Req) (Resp, error)
}
func NewInterceptor[Req Request, Resp Response](proxy *Proxy, method string) (*InterceptorImpl[Req, Resp], error) {
var provider milvuspb.MilvusServiceServer
cached := paramtable.Get().ProxyCfg.EnableCachedServiceProvider.GetAsBool()
if cached {
provider = &CachedProxyServiceProvider{Proxy: proxy}
} else {
provider = &RemoteProxyServiceProvider{Proxy: proxy}
}
switch method {
case "DescribeCollection":
interceptor := &InterceptorImpl[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse]{
proxy: proxy,
method: method,
onCall: provider.DescribeCollection,
onError: func(err error) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
},
}
return interface{}(interceptor).(*InterceptorImpl[Req, Resp]), nil
default:
return nil, fmt.Errorf("method %s not supported", method)
}
}
type InterceptorImpl[Req Request, Resp Response] struct {
proxy *Proxy
method string
onCall func(ctx context.Context, request Req) (Resp, error)
onError func(err error) (Resp, error)
}
func (i *InterceptorImpl[Req, Resp]) Call(ctx context.Context, request Req,
) (Resp, error) {
if err := merr.CheckHealthy(i.proxy.GetStateCode()); err != nil {
return i.onError(err)
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, fmt.Sprintf("Proxy-%s", i.method))
defer sp.End()
tr := timerecord.NewTimeRecorder(i.method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method,
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
resp, err := i.onCall(ctx, request)
if err != nil {
return i.onError(err)
}
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), i.method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return resp, err
}
type CachedProxyServiceProvider struct {
*Proxy
}
func (node *CachedProxyServiceProvider) DescribeCollection(ctx context.Context,
request *milvuspb.DescribeCollectionRequest,
) (resp *milvuspb.DescribeCollectionResponse, err error) {
resp = &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
CollectionName: request.CollectionName,
DbName: request.DbName,
}
wrapErrorStatus := func(err error) *commonpb.Status {
status := &commonpb.Status{}
if errors.Is(err, merr.ErrCollectionNotFound) {
// nolint
status.ErrorCode = commonpb.ErrorCode_CollectionNotExists
// nolint
status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", request.DbName, request.CollectionName)
status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"}
} else {
status = merr.Status(err)
}
return status
}
if request.CollectionName == "" && request.CollectionID > 0 {
collName, err := globalMetaCache.GetCollectionName(ctx, request.DbName, request.CollectionID)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
request.CollectionName = collName
}
// validate collection name, ref describeCollectionTask.PreExecute
if err = validateCollectionName(request.CollectionName); err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
request.CollectionID, err = globalMetaCache.GetCollectionID(ctx, request.DbName, request.CollectionName)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
c, err := globalMetaCache.GetCollectionInfo(ctx, request.DbName, request.CollectionName, request.CollectionID)
if err != nil {
resp.Status = wrapErrorStatus(err)
return resp, nil
}
// skip dynamic fields, see describeCollectionTask.Execute
resp.Schema = &schemapb.CollectionSchema{
Name: c.schema.CollectionSchema.Name,
Description: c.schema.CollectionSchema.Description,
AutoID: c.schema.CollectionSchema.AutoID,
Fields: lo.Filter(c.schema.CollectionSchema.Fields, func(field *schemapb.FieldSchema, _ int) bool {
return !field.IsDynamic
}),
EnableDynamicField: c.schema.CollectionSchema.EnableDynamicField,
Properties: c.schema.CollectionSchema.Properties,
Functions: c.schema.CollectionSchema.Functions,
DbName: c.schema.CollectionSchema.DbName,
}
resp.CollectionID = c.collID
resp.UpdateTimestamp = c.updateTimestamp
resp.UpdateTimestampStr = fmt.Sprintf("%d", c.updateTimestamp)
resp.CreatedTimestamp = c.createdTimestamp
resp.CreatedUtcTimestamp = c.createdUtcTimestamp
resp.ConsistencyLevel = c.consistencyLevel
resp.VirtualChannelNames = c.vChannels
resp.PhysicalChannelNames = c.pChannels
resp.NumPartitions = c.numPartitions
resp.ShardsNum = c.shardsNum
resp.Aliases = c.aliases
resp.Properties = c.properties
return resp, nil
}
type RemoteProxyServiceProvider struct {
*Proxy
}
func (node *RemoteProxyServiceProvider) DescribeCollection(ctx context.Context,
request *milvuspb.DescribeCollectionRequest,
) (*milvuspb.DescribeCollectionResponse, error) {
dct := &describeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
mixCoord: node.mixCoord,
}
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
method := "DescribeCollection"
log.Debug("DescribeCollection received")
if err := node.sched.ddQueue.Enqueue(dct); err != nil {
log.Warn("DescribeCollection failed to enqueue",
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return nil, err
}
log.Debug("DescribeCollection enqueued",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
if err := dct.WaitToFinish(); err != nil {
log.Warn("DescribeCollection failed to WaitToFinish",
zap.Error(err),
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return nil, err
}
log.Debug("DescribeCollection done",
zap.Uint64("BeginTS", dct.BeginTs()),
zap.Uint64("EndTS", dct.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
)
return dct.result, nil
}

View File

@ -0,0 +1,38 @@
package proxy
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
func TestNewInterceptor(t *testing.T) {
mixc := &mocks.MockMixCoordClient{}
mixc.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{IsHealthy: false}, nil)
node := &Proxy{
mixCoord: mixc,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.UpdateStateCode(commonpb.StateCode_Healthy)
mixCoord := mocks.NewMockMixCoordClient(t)
mixCoord.On("DescribeCollection", mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
var err error
globalMetaCache, err = NewMetaCache(mixCoord, nil)
assert.NoError(t, err)
interceptor, err := NewInterceptor[*milvuspb.DescribeCollectionRequest, *milvuspb.DescribeCollectionResponse](node, "DescribeCollection")
assert.NoError(t, err)
resp, err := interceptor.Call(context.Background(), &milvuspb.DescribeCollectionRequest{
DbName: "test",
CollectionName: "test",
})
assert.NoError(t, err)
assert.Equal(t, "can't find collection[database=test][collection=test]", resp.Status.Reason)
}

View File

@ -1583,6 +1583,7 @@ type proxyConfig struct {
MaxVarCharLength ParamItem `refreshable:"false"`
MaxTextLength ParamItem `refreshable:"false"`
MaxResultEntries ParamItem `refreshable:"true"`
EnableCachedServiceProvider ParamItem `refreshable:"true"`
AccessLog AccessLogConfig
@ -2016,6 +2017,13 @@ Disabled if the value is less or equal to 0.`,
}
p.MaxResultEntries.Init(base.mgr)
p.EnableCachedServiceProvider = ParamItem{
Key: "proxy.enableCachedServiceProvider",
Version: "2.6.0",
DefaultValue: "true",
Doc: "enable cached service provider",
}
p.EnableCachedServiceProvider.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "proxy.gracefulStopTimeout",
Version: "2.3.7",

View File

@ -1101,7 +1101,6 @@ class TestUtilityBase(TestcaseBase):
new_collection_name = cf.gen_unique_str(prefix + "new")
alias = cf.gen_unique_str(prefix + "alias")
self.utility_wrap.create_alias(old_collection_name, alias)
collection_alias = collection_w.aliases
self.utility_wrap.rename_collection(old_collection_name, new_collection_name)
collection_w = self.init_collection_wrap(name=new_collection_name,
check_task=CheckTasks.check_collection_property,
@ -1110,7 +1109,7 @@ class TestUtilityBase(TestcaseBase):
collections = self.utility_wrap.list_collections()[0]
assert new_collection_name in collections
assert old_collection_name not in collections
assert collection_alias == collection_w.aliases
assert [alias] == collection_w.aliases
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collections(self):