Fix rename collection data race (#21827)

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2023-01-24 10:01:46 +08:00 committed by GitHub
parent f5bfd36be1
commit d7bb908148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 9 deletions

View File

@ -4428,7 +4428,7 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol
return &commonpb.Status{ return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalCollectionName, ErrorCode: commonpb.ErrorCode_IllegalCollectionName,
Reason: err.Error(), Reason: err.Error(),
}, err }, nil
} }
req.Base = commonpbutil.NewMsgBase( req.Base = commonpbutil.NewMsgBase(

View File

@ -160,7 +160,7 @@ func TestProxyRenameCollection(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Healthy) node.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background() ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"}) resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"})
assert.Error(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetErrorCode()) assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetErrorCode())
}) })

View File

@ -330,9 +330,10 @@ func (mt *MetaTable) getCollectionByIDInternal(ctx context.Context, collectionID
func (mt *MetaTable) GetCollectionByName(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) { func (mt *MetaTable) GetCollectionByName(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
mt.ddLock.RLock() mt.ddLock.RLock()
defer mt.ddLock.RUnlock() defer mt.ddLock.RUnlock()
return mt.getCollectionByNameInternal(ctx, collectionName, ts)
}
var collectionID UniqueID func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
collectionID, ok := mt.collAlias2ID[collectionName] collectionID, ok := mt.collAlias2ID[collectionName]
if ok { if ok {
return mt.getCollectionByIDInternal(ctx, collectionID, ts, false) return mt.getCollectionByIDInternal(ctx, collectionID, ts, false)
@ -447,10 +448,10 @@ func (mt *MetaTable) AlterCollection(ctx context.Context, oldColl *model.Collect
} }
func (mt *MetaTable) RenameCollection(ctx context.Context, oldName string, newName string, ts Timestamp) error { func (mt *MetaTable) RenameCollection(ctx context.Context, oldName string, newName string, ts Timestamp) error {
mt.ddLock.RLock() mt.ddLock.Lock()
defer mt.ddLock.RUnlock() defer mt.ddLock.Unlock()
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
log := log.Ctx(ctx).With(zap.String("oldName", oldName), zap.String("newName", newName)) log := log.Ctx(ctx).With(zap.String("oldName", oldName), zap.String("newName", newName))
//old collection should not be an alias //old collection should not be an alias
@ -461,7 +462,7 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, oldName string, newNa
} }
// check new collection already exists // check new collection already exists
newColl, err := mt.GetCollectionByName(ctx, newName, ts) newColl, err := mt.getCollectionByNameInternal(ctx, newName, ts)
if newColl != nil { if newColl != nil {
return fmt.Errorf("duplicated new collection name :%s with other collection name or alias", newName) return fmt.Errorf("duplicated new collection name :%s with other collection name or alias", newName)
} }
@ -471,7 +472,7 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, oldName string, newNa
} }
// get old collection meta // get old collection meta
oldColl, err := mt.GetCollectionByName(ctx, oldName, ts) oldColl, err := mt.getCollectionByNameInternal(ctx, oldName, ts)
if err != nil { if err != nil {
return err return err
} }