mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
fix: use the right resource key lock for ddl and use new ddl in transfer replica (#45506)
issue: #45452 - alias/rename related DDL should use database level exclusive lock - alias cannot use as the resource key of lock, use collection name instead - transfer replica should use WAL-based framework Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
cabc47ce01
commit
b7fb8ed38c
@ -0,0 +1,96 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package querycoordv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
|
)
|
||||||
|
|
||||||
|
// broadcastAlterLoadConfigCollectionV2ForTransferReplica broadcasts the alter load config message for transfer replica.
|
||||||
|
func (s *Server) broadcastAlterLoadConfigCollectionV2ForTransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) error {
|
||||||
|
broadcaster, err := s.startBroadcastWithCollectionIDLock(ctx, req.GetCollectionID())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer broadcaster.Close()
|
||||||
|
|
||||||
|
if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetSourceResourceGroup()); !ok {
|
||||||
|
return merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup())
|
||||||
|
}
|
||||||
|
if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetTargetResourceGroup()); !ok {
|
||||||
|
return merr.WrapErrResourceGroupNotFound(req.GetTargetResourceGroup())
|
||||||
|
}
|
||||||
|
if req.GetSourceResourceGroup() == req.GetTargetResourceGroup() {
|
||||||
|
return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", req.GetSourceResourceGroup())
|
||||||
|
}
|
||||||
|
if req.GetNumReplica() <= 0 {
|
||||||
|
return merr.WrapErrParameterInvalid("NumReplica > 0", fmt.Sprintf("invalid NumReplica %d", req.GetNumReplica()))
|
||||||
|
}
|
||||||
|
|
||||||
|
coll, err := s.broker.DescribeCollection(ctx, req.GetCollectionID())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
currentLoadConfig := s.getCurrentLoadConfig(ctx, req.GetCollectionID())
|
||||||
|
if currentLoadConfig.Collection == nil {
|
||||||
|
return merr.WrapErrCollectionNotLoaded(coll.CollectionName)
|
||||||
|
}
|
||||||
|
if int(req.NumReplica) > len(currentLoadConfig.Replicas) {
|
||||||
|
return merr.WrapErrParameterInvalid(int(req.NumReplica), len(currentLoadConfig.Replicas), "the number of replicas to transfer is greater than the number of replicas in the collection")
|
||||||
|
}
|
||||||
|
|
||||||
|
replicaNumbers := currentLoadConfig.GetReplicaNumber()
|
||||||
|
replicaNumberInSourceRG := replicaNumbers[req.GetSourceResourceGroup()]
|
||||||
|
if replicaNumberInSourceRG < int(req.NumReplica) {
|
||||||
|
return merr.WrapErrParameterInvalid("NumReplica not greater than the number of replica in source resource group",
|
||||||
|
fmt.Sprintf("only found [%d] replicas of collection [%s] in source resource group [%s], but %d require",
|
||||||
|
replicaNumberInSourceRG,
|
||||||
|
coll.CollectionName,
|
||||||
|
req.GetSourceResourceGroup(),
|
||||||
|
req.GetNumReplica()))
|
||||||
|
}
|
||||||
|
// update the replica numbers in the source and target resource groups.
|
||||||
|
replicaNumbers[req.GetSourceResourceGroup()] -= int(req.NumReplica)
|
||||||
|
replicaNumbers[req.GetTargetResourceGroup()] += int(req.NumReplica)
|
||||||
|
|
||||||
|
alterLoadConfigReq := &job.AlterLoadConfigRequest{
|
||||||
|
Meta: s.meta,
|
||||||
|
CollectionInfo: coll,
|
||||||
|
Current: currentLoadConfig,
|
||||||
|
Expected: job.ExpectedLoadConfig{
|
||||||
|
ExpectedPartitionIDs: currentLoadConfig.GetPartitionIDs(),
|
||||||
|
ExpectedReplicaNumber: replicaNumbers,
|
||||||
|
ExpectedFieldIndexID: currentLoadConfig.GetFieldIndexID(),
|
||||||
|
ExpectedLoadFields: currentLoadConfig.GetLoadFields(),
|
||||||
|
ExpectedPriority: commonpb.LoadPriority_LOW,
|
||||||
|
ExpectedUserSpecifiedReplicaMode: currentLoadConfig.GetUserSpecifiedReplicaMode(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
msg, err := job.GenerateAlterLoadConfigMessage(ctx, alterLoadConfigReq)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = broadcaster.Broadcast(ctx, msg)
|
||||||
|
return err
|
||||||
|
}
|
||||||
@ -867,6 +867,7 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
|
|||||||
log.Warn("failed to create resource group", zap.Error(err))
|
log.Warn("failed to create resource group", zap.Error(err))
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
log.Info("create resource group done")
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -885,6 +886,7 @@ func (s *Server) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateRe
|
|||||||
log.Warn("failed to update resource group", zap.Error(err))
|
log.Warn("failed to update resource group", zap.Error(err))
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
log.Info("update resource group done")
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -907,6 +909,7 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
|
|||||||
log.Warn("failed to drop resource group", zap.Error(err))
|
log.Warn("failed to drop resource group", zap.Error(err))
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
log.Info("drop resource group done")
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -929,6 +932,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
|
|||||||
log.Warn("failed to transfer node", zap.Error(err))
|
log.Warn("failed to transfer node", zap.Error(err))
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
log.Info("transfer node done")
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -937,6 +941,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
|
|||||||
zap.String("source", req.GetSourceResourceGroup()),
|
zap.String("source", req.GetSourceResourceGroup()),
|
||||||
zap.String("target", req.GetTargetResourceGroup()),
|
zap.String("target", req.GetTargetResourceGroup()),
|
||||||
zap.Int64("collectionID", req.GetCollectionID()),
|
zap.Int64("collectionID", req.GetCollectionID()),
|
||||||
|
zap.Int64("numReplica", req.GetNumReplica()),
|
||||||
)
|
)
|
||||||
|
|
||||||
log.Info("transfer replica request received")
|
log.Info("transfer replica request received")
|
||||||
@ -945,22 +950,12 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
|
|||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: !!!WARNING, replica manager and resource manager doesn't protected with each other by lock.
|
if err := s.broadcastAlterLoadConfigCollectionV2ForTransferReplica(ctx, req); err != nil {
|
||||||
if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetSourceResourceGroup()); !ok {
|
log.Warn("failed to transfer replica between resource group", zap.Error(err))
|
||||||
err := merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup())
|
return merr.Status(err), nil
|
||||||
return merr.Status(errors.Wrap(err,
|
|
||||||
fmt.Sprintf("the source resource group[%s] doesn't exist", req.GetSourceResourceGroup()))), nil
|
|
||||||
}
|
}
|
||||||
|
log.Info("transfer replica done")
|
||||||
if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetTargetResourceGroup()); !ok {
|
return merr.Success(), nil
|
||||||
err := merr.WrapErrResourceGroupNotFound(req.GetTargetResourceGroup())
|
|
||||||
return merr.Status(errors.Wrap(err,
|
|
||||||
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()))), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply change into replica manager.
|
|
||||||
err := s.meta.TransferReplica(ctx, req.GetCollectionID(), req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumReplica()))
|
|
||||||
return merr.Status(err), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
|
func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
|
||||||
|
|||||||
@ -857,6 +857,7 @@ func (suite *ServiceSuite) TestTransferNode() {
|
|||||||
|
|
||||||
func (suite *ServiceSuite) TestTransferReplica() {
|
func (suite *ServiceSuite) TestTransferReplica() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
suite.loadAll()
|
||||||
server := suite.server
|
server := suite.server
|
||||||
|
|
||||||
err := server.meta.ResourceManager.AddResourceGroup(ctx, "rg1", &rgpb.ResourceGroupConfig{
|
err := server.meta.ResourceManager.AddResourceGroup(ctx, "rg1", &rgpb.ResourceGroupConfig{
|
||||||
@ -882,7 +883,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||||||
NumReplica: 2,
|
NumReplica: 2,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
|
suite.ErrorIs(merr.Error(resp), merr.ErrCollectionNotLoaded)
|
||||||
|
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: "rgg",
|
SourceResourceGroup: "rgg",
|
||||||
@ -911,22 +912,6 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
|
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
|
||||||
|
|
||||||
suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{
|
|
||||||
CollectionID: 1,
|
|
||||||
ID: 111,
|
|
||||||
ResourceGroup: meta.DefaultResourceGroupName,
|
|
||||||
}, typeutil.NewUniqueSet(1)))
|
|
||||||
suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{
|
|
||||||
CollectionID: 1,
|
|
||||||
ID: 222,
|
|
||||||
ResourceGroup: meta.DefaultResourceGroupName,
|
|
||||||
}, typeutil.NewUniqueSet(2)))
|
|
||||||
suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{
|
|
||||||
CollectionID: 1,
|
|
||||||
ID: 333,
|
|
||||||
ResourceGroup: meta.DefaultResourceGroupName,
|
|
||||||
}, typeutil.NewUniqueSet(3)))
|
|
||||||
|
|
||||||
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||||
NodeID: 1001,
|
NodeID: 1001,
|
||||||
Address: "localhost",
|
Address: "localhost",
|
||||||
@ -958,62 +943,45 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||||||
suite.server.meta.HandleNodeUp(ctx, 1004)
|
suite.server.meta.HandleNodeUp(ctx, 1004)
|
||||||
suite.server.meta.HandleNodeUp(ctx, 1005)
|
suite.server.meta.HandleNodeUp(ctx, 1005)
|
||||||
|
|
||||||
suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{
|
|
||||||
CollectionID: 2,
|
|
||||||
ID: 444,
|
|
||||||
ResourceGroup: meta.DefaultResourceGroupName,
|
|
||||||
}, typeutil.NewUniqueSet(3)))
|
|
||||||
suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{
|
|
||||||
CollectionID: 2,
|
|
||||||
ID: 555,
|
|
||||||
ResourceGroup: "rg2",
|
|
||||||
}, typeutil.NewUniqueSet(4)))
|
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
TargetResourceGroup: "rg2",
|
TargetResourceGroup: "rg2",
|
||||||
CollectionID: 2,
|
CollectionID: 1001,
|
||||||
NumReplica: 1,
|
NumReplica: 1,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(merr.CheckRPCCall(resp, err))
|
||||||
// we support dynamically increase replica num in resource group now.
|
|
||||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
|
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
TargetResourceGroup: "rg1",
|
TargetResourceGroup: "rg1",
|
||||||
CollectionID: 1,
|
CollectionID: 1001,
|
||||||
NumReplica: 1,
|
NumReplica: 1,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(merr.CheckRPCCall(resp, err))
|
||||||
// we support transfer replica to resource group load same collection.
|
|
||||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
|
|
||||||
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(ctx, 1))
|
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(ctx, 1001))
|
||||||
suite.Equal(3, replicaNum)
|
suite.Equal(3, replicaNum)
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
TargetResourceGroup: "rg3",
|
TargetResourceGroup: "rg3",
|
||||||
CollectionID: 1,
|
CollectionID: 1001,
|
||||||
NumReplica: 2,
|
NumReplica: 1,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(merr.CheckRPCCall(resp, err))
|
||||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: "rg1",
|
SourceResourceGroup: "rg1",
|
||||||
TargetResourceGroup: "rg3",
|
TargetResourceGroup: "rg3",
|
||||||
CollectionID: 1,
|
CollectionID: 1001,
|
||||||
NumReplica: 1,
|
NumReplica: 1,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(merr.CheckRPCCall(resp, err))
|
||||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
suite.Len(suite.server.meta.GetByResourceGroup(ctx, "rg3"), 3)
|
|
||||||
|
|
||||||
// server unhealthy
|
// server unhealthy
|
||||||
server.UpdateStateCode(commonpb.StateCode_Abnormal)
|
server.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
TargetResourceGroup: "rg3",
|
TargetResourceGroup: "rg3",
|
||||||
CollectionID: 1,
|
CollectionID: 1001,
|
||||||
NumReplica: 2,
|
NumReplica: 2,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
|||||||
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisterDDLCallbacks registers the ddl callbacks.
|
// RegisterDDLCallbacks registers the ddl callbacks.
|
||||||
@ -142,21 +143,10 @@ func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadca
|
|||||||
return broadcaster, nil
|
return broadcaster, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startBroadcastWithAlterAliasLock starts a broadcast with alter alias lock.
|
|
||||||
func startBroadcastWithAlterAliasLock(ctx context.Context, dbName string, collectionName string, alias string) (broadcaster.BroadcastAPI, error) {
|
|
||||||
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
|
|
||||||
message.NewSharedDBNameResourceKey(dbName),
|
|
||||||
message.NewExclusiveCollectionNameResourceKey(dbName, collectionName),
|
|
||||||
message.NewExclusiveCollectionNameResourceKey(dbName, alias),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "failed to start broadcast with alter alias lock")
|
|
||||||
}
|
|
||||||
return broadcaster, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// startBroadcastWithCollectionLock starts a broadcast with collection lock.
|
// startBroadcastWithCollectionLock starts a broadcast with collection lock.
|
||||||
func startBroadcastWithCollectionLock(ctx context.Context, dbName string, collectionName string) (broadcaster.BroadcastAPI, error) {
|
// CreateCollection and DropCollection can only be called with collection name itself, not alias.
|
||||||
|
// So it's safe to use collection name directly for those API.
|
||||||
|
func (*Core) startBroadcastWithCollectionLock(ctx context.Context, dbName string, collectionName string) (broadcaster.BroadcastAPI, error) {
|
||||||
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
|
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
|
||||||
message.NewSharedDBNameResourceKey(dbName),
|
message.NewSharedDBNameResourceKey(dbName),
|
||||||
message.NewExclusiveCollectionNameResourceKey(dbName, collectionName),
|
message.NewExclusiveCollectionNameResourceKey(dbName, collectionName),
|
||||||
@ -166,3 +156,14 @@ func startBroadcastWithCollectionLock(ctx context.Context, dbName string, collec
|
|||||||
}
|
}
|
||||||
return broadcaster, nil
|
return broadcaster, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startBroadcastWithAliasOrCollectionLock starts a broadcast with alias or collection lock.
|
||||||
|
// Some API like AlterCollection can be called with alias or collection name,
|
||||||
|
// so we need to get the real collection name to add resource key lock.
|
||||||
|
func (c *Core) startBroadcastWithAliasOrCollectionLock(ctx context.Context, dbName string, collectionNameOrAlias string) (broadcaster.BroadcastAPI, error) {
|
||||||
|
coll, err := c.meta.GetCollectionByName(ctx, dbName, collectionNameOrAlias, typeutil.MaxTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to get collection by name")
|
||||||
|
}
|
||||||
|
return c.startBroadcastWithCollectionLock(ctx, dbName, coll.Name)
|
||||||
|
}
|
||||||
|
|||||||
@ -32,7 +32,7 @@ func (c *Core) broadcastCreateAlias(ctx context.Context, req *milvuspb.CreateAli
|
|||||||
req.DbName = strings.TrimSpace(req.DbName)
|
req.DbName = strings.TrimSpace(req.DbName)
|
||||||
req.Alias = strings.TrimSpace(req.Alias)
|
req.Alias = strings.TrimSpace(req.Alias)
|
||||||
req.CollectionName = strings.TrimSpace(req.CollectionName)
|
req.CollectionName = strings.TrimSpace(req.CollectionName)
|
||||||
broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias())
|
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -70,7 +70,7 @@ func (c *Core) broadcastAlterAlias(ctx context.Context, req *milvuspb.AlterAlias
|
|||||||
req.DbName = strings.TrimSpace(req.DbName)
|
req.DbName = strings.TrimSpace(req.DbName)
|
||||||
req.Alias = strings.TrimSpace(req.Alias)
|
req.Alias = strings.TrimSpace(req.Alias)
|
||||||
req.CollectionName = strings.TrimSpace(req.CollectionName)
|
req.CollectionName = strings.TrimSpace(req.CollectionName)
|
||||||
broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias())
|
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import (
|
|||||||
|
|
||||||
// broadcastAlterCollectionForAddField broadcasts the put collection message for add field.
|
// broadcastAlterCollectionForAddField broadcasts the put collection message for add field.
|
||||||
func (c *Core) broadcastAlterCollectionForAddField(ctx context.Context, req *milvuspb.AddCollectionFieldRequest) error {
|
func (c *Core) broadcastAlterCollectionForAddField(ctx context.Context, req *milvuspb.AddCollectionFieldRequest) error {
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *Core) broadcastAlterCollectionV2ForAlterCollectionField(ctx context.Context, req *milvuspb.AlterCollectionFieldRequest) error {
|
func (c *Core) broadcastAlterCollectionV2ForAlterCollectionField(ctx context.Context, req *milvuspb.AlterCollectionFieldRequest) error {
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -36,10 +36,8 @@ func (c *Core) broadcastAlterCollectionForRenameCollection(ctx context.Context,
|
|||||||
|
|
||||||
// StartBroadcastWithResourceKeys will deduplicate the resource keys itself, so it's safe to add all the resource keys here.
|
// StartBroadcastWithResourceKeys will deduplicate the resource keys itself, so it's safe to add all the resource keys here.
|
||||||
rks := []message.ResourceKey{
|
rks := []message.ResourceKey{
|
||||||
message.NewSharedDBNameResourceKey(req.GetNewDBName()),
|
message.NewExclusiveDBNameResourceKey(req.GetNewDBName()),
|
||||||
message.NewSharedDBNameResourceKey(req.GetDbName()),
|
message.NewExclusiveDBNameResourceKey(req.GetDbName()),
|
||||||
message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetOldName()),
|
|
||||||
message.NewExclusiveCollectionNameResourceKey(req.GetNewDBName(), req.GetNewName()),
|
|
||||||
}
|
}
|
||||||
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, rks...)
|
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, rks...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -62,7 +62,7 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r
|
|||||||
return c.broadcastAlterCollectionForAlterDynamicField(ctx, req, targetValue)
|
return c.broadcastAlterCollectionForAlterDynamicField(ctx, req, targetValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -145,7 +145,7 @@ func (c *Core) broadcastAlterCollectionForAlterDynamicField(ctx context.Context,
|
|||||||
if len(req.GetProperties()) != 1 {
|
if len(req.GetProperties()) != 1 {
|
||||||
return merr.WrapErrParameterInvalidMsg("cannot alter dynamic schema with other properties at the same time")
|
return merr.WrapErrParameterInvalidMsg("cannot alter dynamic schema with other properties at the same time")
|
||||||
}
|
}
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -56,7 +56,7 @@ func (c *Core) broadcastCreateCollectionV1(ctx context.Context, req *milvuspb.Cr
|
|||||||
req.NumPartitions = int64(1)
|
req.NumPartitions = int64(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,7 +34,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *Core) broadcastCreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) error {
|
func (c *Core) broadcastCreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) error {
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, in.GetDbName(), in.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, in.GetDbName(), in.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,7 +25,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast"
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
@ -34,9 +33,7 @@ import (
|
|||||||
func (c *Core) broadcastDropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) error {
|
func (c *Core) broadcastDropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) error {
|
||||||
req.DbName = strings.TrimSpace(req.DbName)
|
req.DbName = strings.TrimSpace(req.DbName)
|
||||||
req.Alias = strings.TrimSpace(req.Alias)
|
req.Alias = strings.TrimSpace(req.Alias)
|
||||||
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
|
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
|
||||||
message.NewSharedDBNameResourceKey(req.GetDbName()),
|
|
||||||
message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetAlias()))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *Core) broadcastDropCollectionV1(ctx context.Context, req *milvuspb.DropCollectionRequest) error {
|
func (c *Core) broadcastDropCollectionV1(ctx context.Context, req *milvuspb.DropCollectionRequest) error {
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ func (c *Core) broadcastDropPartition(ctx context.Context, in *milvuspb.DropPart
|
|||||||
return errors.New("default partition cannot be deleted")
|
return errors.New("default partition cannot be deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcaster, err := startBroadcastWithCollectionLock(ctx, in.GetDbName(), in.GetCollectionName())
|
broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, in.GetDbName(), in.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -40,6 +40,7 @@ type dropCollectionTask struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *dropCollectionTask) validate(ctx context.Context) error {
|
func (t *dropCollectionTask) validate(ctx context.Context) error {
|
||||||
|
// Critical promise here, also see comment of startBroadcastWithCollectionLock.
|
||||||
if t.meta.IsAlias(ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) {
|
if t.meta.IsAlias(ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) {
|
||||||
return fmt.Errorf("cannot drop the collection via alias = %s", t.Req.CollectionName)
|
return fmt.Errorf("cannot drop the collection via alias = %s", t.Req.CollectionName)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -66,6 +67,9 @@ func NewExclusiveClusterResourceKey() ResourceKey {
|
|||||||
|
|
||||||
// NewSharedCollectionNameResourceKey creates a shared collection name resource key.
|
// NewSharedCollectionNameResourceKey creates a shared collection name resource key.
|
||||||
func NewSharedCollectionNameResourceKey(dbName string, collectionName string) ResourceKey {
|
func NewSharedCollectionNameResourceKey(dbName string, collectionName string) ResourceKey {
|
||||||
|
if dbName == "" {
|
||||||
|
dbName = util.DefaultDBName
|
||||||
|
}
|
||||||
return ResourceKey{
|
return ResourceKey{
|
||||||
Domain: messagespb.ResourceDomain_ResourceDomainCollectionName,
|
Domain: messagespb.ResourceDomain_ResourceDomainCollectionName,
|
||||||
Key: fmt.Sprintf("%s:%s", dbName, collectionName),
|
Key: fmt.Sprintf("%s:%s", dbName, collectionName),
|
||||||
@ -75,6 +79,9 @@ func NewSharedCollectionNameResourceKey(dbName string, collectionName string) Re
|
|||||||
|
|
||||||
// NewExclusiveCollectionNameResourceKey creates an exclusive collection name resource key.
|
// NewExclusiveCollectionNameResourceKey creates an exclusive collection name resource key.
|
||||||
func NewExclusiveCollectionNameResourceKey(dbName string, collectionName string) ResourceKey {
|
func NewExclusiveCollectionNameResourceKey(dbName string, collectionName string) ResourceKey {
|
||||||
|
if dbName == "" {
|
||||||
|
dbName = util.DefaultDBName
|
||||||
|
}
|
||||||
return ResourceKey{
|
return ResourceKey{
|
||||||
Domain: messagespb.ResourceDomain_ResourceDomainCollectionName,
|
Domain: messagespb.ResourceDomain_ResourceDomainCollectionName,
|
||||||
Key: fmt.Sprintf("%s:%s", dbName, collectionName),
|
Key: fmt.Sprintf("%s:%s", dbName, collectionName),
|
||||||
@ -84,6 +91,9 @@ func NewExclusiveCollectionNameResourceKey(dbName string, collectionName string)
|
|||||||
|
|
||||||
// NewSharedDBNameResourceKey creates a shared db name resource key.
|
// NewSharedDBNameResourceKey creates a shared db name resource key.
|
||||||
func NewSharedDBNameResourceKey(dbName string) ResourceKey {
|
func NewSharedDBNameResourceKey(dbName string) ResourceKey {
|
||||||
|
if dbName == "" {
|
||||||
|
dbName = util.DefaultDBName
|
||||||
|
}
|
||||||
return ResourceKey{
|
return ResourceKey{
|
||||||
Domain: messagespb.ResourceDomain_ResourceDomainDBName,
|
Domain: messagespb.ResourceDomain_ResourceDomainDBName,
|
||||||
Key: dbName,
|
Key: dbName,
|
||||||
@ -93,6 +103,9 @@ func NewSharedDBNameResourceKey(dbName string) ResourceKey {
|
|||||||
|
|
||||||
// NewExclusiveDBNameResourceKey creates an exclusive db name resource key.
|
// NewExclusiveDBNameResourceKey creates an exclusive db name resource key.
|
||||||
func NewExclusiveDBNameResourceKey(dbName string) ResourceKey {
|
func NewExclusiveDBNameResourceKey(dbName string) ResourceKey {
|
||||||
|
if dbName == "" {
|
||||||
|
dbName = util.DefaultDBName
|
||||||
|
}
|
||||||
return ResourceKey{
|
return ResourceKey{
|
||||||
Domain: messagespb.ResourceDomain_ResourceDomainDBName,
|
Domain: messagespb.ResourceDomain_ResourceDomainDBName,
|
||||||
Key: dbName,
|
Key: dbName,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user