mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
add resource group name rule (#21992)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
fc1d2d18cf
commit
aced41d5d8
@ -4456,9 +4456,16 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
|
||||
method := "CreateResourceGroup"
|
||||
if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
|
||||
log.Warn("CreateResourceGroup failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateResourceGroup")
|
||||
defer sp.End()
|
||||
method := "CreateResourceGroup"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
@ -4478,13 +4485,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr
|
||||
if err := node.sched.ddQueue.Enqueue(t); err != nil {
|
||||
log.Warn("CreateResourceGroup failed to enqueue",
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("CreateResourceGroup enqueued",
|
||||
@ -4496,12 +4497,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr
|
||||
zap.Error(err),
|
||||
zap.Uint64("BeginTS", t.BeginTs()),
|
||||
zap.Uint64("EndTS", t.EndTs()))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("CreateResourceGroup done",
|
||||
@ -4514,14 +4510,30 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr
|
||||
return t.result, nil
|
||||
}
|
||||
|
||||
func getErrResponse(err error, method string) *commonpb.Status {
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
|
||||
if !node.checkHealthy() {
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
|
||||
method := "DropResourceGroup"
|
||||
if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
|
||||
log.Warn("DropResourceGroup failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropResourceGroup")
|
||||
defer sp.End()
|
||||
method := "DropResourceGroup"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
@ -4542,12 +4554,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop
|
||||
log.Warn("DropResourceGroup failed to enqueue",
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("DropResourceGroup enqueued",
|
||||
@ -4559,12 +4566,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop
|
||||
zap.Error(err),
|
||||
zap.Uint64("BeginTS", t.BeginTs()),
|
||||
zap.Uint64("EndTS", t.EndTs()))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("DropResourceGroup done",
|
||||
@ -4582,9 +4584,23 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
|
||||
method := "TransferNode"
|
||||
if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
|
||||
log.Warn("TransferNode failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
|
||||
log.Warn("TransferNode failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferNode")
|
||||
defer sp.End()
|
||||
method := "TransferNode"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
@ -4605,12 +4621,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN
|
||||
log.Warn("TransferNode failed to enqueue",
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("TransferNode enqueued",
|
||||
@ -4622,12 +4633,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN
|
||||
zap.Error(err),
|
||||
zap.Uint64("BeginTS", t.BeginTs()),
|
||||
zap.Uint64("EndTS", t.EndTs()))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("TransferNode done",
|
||||
@ -4645,9 +4651,23 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
|
||||
method := "TransferReplica"
|
||||
if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
|
||||
log.Warn("TransferReplica failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
|
||||
log.Warn("TransferReplica failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferReplica")
|
||||
defer sp.End()
|
||||
method := "TransferReplica"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
@ -4668,12 +4688,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf
|
||||
log.Warn("TransferReplica failed to enqueue",
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("TransferReplica enqueued",
|
||||
@ -4685,12 +4700,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf
|
||||
zap.Error(err),
|
||||
zap.Uint64("BeginTS", t.BeginTs()),
|
||||
zap.Uint64("EndTS", t.EndTs()))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return getErrResponse(err, method), nil
|
||||
}
|
||||
|
||||
log.Debug("TransferReplica done",
|
||||
@ -4779,9 +4789,20 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.
|
||||
}, nil
|
||||
}
|
||||
|
||||
method := "DescribeResourceGroup"
|
||||
GetErrResponse := func(err error) *milvuspb.DescribeResourceGroupResponse {
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
|
||||
|
||||
return &milvuspb.DescribeResourceGroupResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeResourceGroup")
|
||||
defer sp.End()
|
||||
method := "DescribeResourceGroup"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
@ -4802,14 +4823,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.
|
||||
log.Warn("DescribeResourceGroup failed to enqueue",
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return &milvuspb.DescribeResourceGroupResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
return GetErrResponse(err), nil
|
||||
}
|
||||
|
||||
log.Debug("DescribeResourceGroup enqueued",
|
||||
@ -4821,14 +4835,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.
|
||||
zap.Error(err),
|
||||
zap.Uint64("BeginTS", t.BeginTs()),
|
||||
zap.Uint64("EndTS", t.EndTs()))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return &milvuspb.DescribeResourceGroupResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
return GetErrResponse(err), nil
|
||||
}
|
||||
|
||||
log.Debug("DescribeResourceGroup done",
|
||||
|
||||
@ -266,8 +266,68 @@ func TestProxy_ResourceGroup(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("describe resource group", func(t *testing.T) {
|
||||
resp, err := node.DescribeResourceGroup(ctx, &milvuspb.DescribeResourceGroupRequest{})
|
||||
resp, err := node.DescribeResourceGroup(ctx, &milvuspb.DescribeResourceGroupRequest{ResourceGroup: "rg"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_Success)
|
||||
})
|
||||
}
|
||||
|
||||
func TestProxy_InvalidResourceGroupName(t *testing.T) {
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
ctx := context.Background()
|
||||
|
||||
node, err := NewProxy(ctx, factory)
|
||||
assert.NoError(t, err)
|
||||
node.multiRateLimiter = NewMultiRateLimiter()
|
||||
node.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
|
||||
qc := NewQueryCoordMock()
|
||||
node.SetQueryCoordClient(qc)
|
||||
|
||||
tsoAllocatorIns := newMockTsoAllocator()
|
||||
node.sched, err = newTaskScheduler(node.ctx, tsoAllocatorIns, node.factory)
|
||||
assert.NoError(t, err)
|
||||
node.sched.Start()
|
||||
defer node.sched.Close()
|
||||
|
||||
rc := &MockRootCoordClientInterface{}
|
||||
mgr := newShardClientMgr()
|
||||
InitMetaCache(ctx, rc, qc, mgr)
|
||||
|
||||
t.Run("create resource group", func(t *testing.T) {
|
||||
resp, err := node.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
|
||||
ResourceGroup: "...",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
|
||||
})
|
||||
|
||||
t.Run("drop resource group", func(t *testing.T) {
|
||||
resp, err := node.DropResourceGroup(ctx, &milvuspb.DropResourceGroupRequest{
|
||||
ResourceGroup: "...",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
|
||||
})
|
||||
|
||||
t.Run("transfer node", func(t *testing.T) {
|
||||
resp, err := node.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||
SourceResourceGroup: "...",
|
||||
TargetResourceGroup: "!!!",
|
||||
NumNode: 1,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
|
||||
})
|
||||
|
||||
t.Run("transfer replica", func(t *testing.T) {
|
||||
resp, err := node.TransferReplica(ctx, &milvuspb.TransferReplicaRequest{
|
||||
SourceResourceGroup: "...",
|
||||
TargetResourceGroup: "!!!",
|
||||
NumReplica: 1,
|
||||
CollectionName: "collection1",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
|
||||
})
|
||||
}
|
||||
|
||||
@ -97,21 +97,44 @@ func validateCollectionNameOrAlias(entity, entityType string) error {
|
||||
|
||||
invalidMsg := fmt.Sprintf("Invalid collection %s: %s. ", entityType, entity)
|
||||
if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
|
||||
msg := invalidMsg + fmt.Sprintf("The length of a collection %s must be less than ", entityType) + Params.ProxyCfg.MaxNameLength.GetValue() + " characters."
|
||||
return errors.New(msg)
|
||||
return fmt.Errorf("%s the length of a collection %s must be less than %s characters", invalidMsg, entityType,
|
||||
Params.ProxyCfg.MaxNameLength.GetValue())
|
||||
}
|
||||
|
||||
firstChar := entity[0]
|
||||
if firstChar != '_' && !isAlpha(firstChar) {
|
||||
msg := invalidMsg + fmt.Sprintf("The first character of a collection %s must be an underscore or letter.", entityType)
|
||||
return errors.New(msg)
|
||||
return fmt.Errorf("%s the first character of a collection %s must be an underscore or letter", invalidMsg, entityType)
|
||||
}
|
||||
|
||||
for i := 1; i < len(entity); i++ {
|
||||
c := entity[i]
|
||||
if c != '_' && !isAlpha(c) && !isNumber(c) {
|
||||
msg := invalidMsg + fmt.Sprintf("Collection %s can only contain numbers, letters and underscores.", entityType)
|
||||
return errors.New(msg)
|
||||
return fmt.Errorf("%s collection %s can only contain numbers, letters and underscores", invalidMsg, entityType)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ValidateResourceGroupName(entity string) error {
|
||||
if entity == "" {
|
||||
return fmt.Errorf("resource group name %s should not be empty", entity)
|
||||
}
|
||||
|
||||
invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity)
|
||||
if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
|
||||
return fmt.Errorf("%s the length of a resource group name must be less than %s characters",
|
||||
invalidMsg, Params.ProxyCfg.MaxNameLength.GetValue())
|
||||
}
|
||||
|
||||
firstChar := entity[0]
|
||||
if firstChar != '_' && !isAlpha(firstChar) {
|
||||
return fmt.Errorf("%s the first character of a resource group name must be an underscore or letter", invalidMsg)
|
||||
}
|
||||
|
||||
for i := 1; i < len(entity); i++ {
|
||||
c := entity[i]
|
||||
if c != '_' && !isAlpha(c) && !isNumber(c) {
|
||||
return fmt.Errorf("%s resource group name can only contain numbers, letters and underscores", invalidMsg)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -68,6 +68,31 @@ func TestValidateCollectionName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateResourceGroupName(t *testing.T) {
|
||||
assert.Nil(t, ValidateResourceGroupName("abc"))
|
||||
assert.Nil(t, ValidateResourceGroupName("_123abc"))
|
||||
assert.Nil(t, ValidateResourceGroupName("abc123_"))
|
||||
|
||||
longName := make([]byte, 256)
|
||||
for i := 0; i < len(longName); i++ {
|
||||
longName[i] = 'a'
|
||||
}
|
||||
invalidNames := []string{
|
||||
"123abc",
|
||||
"$abc",
|
||||
"abc$",
|
||||
"_12 ac",
|
||||
" ",
|
||||
"",
|
||||
string(longName),
|
||||
"中文",
|
||||
}
|
||||
|
||||
for _, name := range invalidNames {
|
||||
assert.NotNil(t, ValidateResourceGroupName(name))
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidatePartitionTag(t *testing.T) {
|
||||
assert.Nil(t, validatePartitionTag("abc", true))
|
||||
assert.Nil(t, validatePartitionTag("123abc", true))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user