diff --git a/internal/querycoordv2/ddl_callbacks_alter_load_info_transfer_replica.go b/internal/querycoordv2/ddl_callbacks_alter_load_info_transfer_replica.go new file mode 100644 index 0000000000..057475efe5 --- /dev/null +++ b/internal/querycoordv2/ddl_callbacks_alter_load_info_transfer_replica.go @@ -0,0 +1,96 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoordv2 + +import ( + "context" + "fmt" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/querycoordv2/job" + "github.com/milvus-io/milvus/pkg/v2/proto/querypb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" +) + +// broadcastAlterLoadConfigCollectionV2ForTransferReplica broadcasts the alter load config message for transfer replica. +func (s *Server) broadcastAlterLoadConfigCollectionV2ForTransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) error { + broadcaster, err := s.startBroadcastWithCollectionIDLock(ctx, req.GetCollectionID()) + if err != nil { + return err + } + defer broadcaster.Close() + + if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetSourceResourceGroup()); !ok { + return merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup()) + } + if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetTargetResourceGroup()); !ok { + return merr.WrapErrResourceGroupNotFound(req.GetTargetResourceGroup()) + } + if req.GetSourceResourceGroup() == req.GetTargetResourceGroup() { + return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", req.GetSourceResourceGroup()) + } + if req.GetNumReplica() <= 0 { + return merr.WrapErrParameterInvalid("NumReplica > 0", fmt.Sprintf("invalid NumReplica %d", req.GetNumReplica())) + } + + coll, err := s.broker.DescribeCollection(ctx, req.GetCollectionID()) + if err != nil { + return err + } + + currentLoadConfig := s.getCurrentLoadConfig(ctx, req.GetCollectionID()) + if currentLoadConfig.Collection == nil { + return merr.WrapErrCollectionNotLoaded(coll.CollectionName) + } + if int(req.NumReplica) > len(currentLoadConfig.Replicas) { + return merr.WrapErrParameterInvalid(int(req.NumReplica), len(currentLoadConfig.Replicas), "the number of replicas to transfer is greater than the number of replicas in the collection") + } + + replicaNumbers := currentLoadConfig.GetReplicaNumber() + replicaNumberInSourceRG := replicaNumbers[req.GetSourceResourceGroup()] + if replicaNumberInSourceRG < int(req.NumReplica) { + return merr.WrapErrParameterInvalid("NumReplica not greater than the number of replica in source resource group", + fmt.Sprintf("only found [%d] replicas of collection [%s] in source resource group [%s], but %d require", + replicaNumberInSourceRG, + coll.CollectionName, + req.GetSourceResourceGroup(), + req.GetNumReplica())) + } + // update the replica numbers in the source and target resource groups. + replicaNumbers[req.GetSourceResourceGroup()] -= int(req.NumReplica) + replicaNumbers[req.GetTargetResourceGroup()] += int(req.NumReplica) + + alterLoadConfigReq := &job.AlterLoadConfigRequest{ + Meta: s.meta, + CollectionInfo: coll, + Current: currentLoadConfig, + Expected: job.ExpectedLoadConfig{ + ExpectedPartitionIDs: currentLoadConfig.GetPartitionIDs(), + ExpectedReplicaNumber: replicaNumbers, + ExpectedFieldIndexID: currentLoadConfig.GetFieldIndexID(), + ExpectedLoadFields: currentLoadConfig.GetLoadFields(), + ExpectedPriority: commonpb.LoadPriority_LOW, + ExpectedUserSpecifiedReplicaMode: currentLoadConfig.GetUserSpecifiedReplicaMode(), + }, + } + msg, err := job.GenerateAlterLoadConfigMessage(ctx, alterLoadConfigReq) + if err != nil { + return err + } + _, err = broadcaster.Broadcast(ctx, msg) + return err +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index f475211ce2..35fd9b890c 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -867,6 +867,7 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe log.Warn("failed to create resource group", zap.Error(err)) return merr.Status(err), nil } + log.Info("create resource group done") return merr.Success(), nil } @@ -885,6 +886,7 @@ func (s *Server) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateRe log.Warn("failed to update resource group", zap.Error(err)) return merr.Status(err), nil } + log.Info("update resource group done") return merr.Success(), nil } @@ -907,6 +909,7 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour log.Warn("failed to drop resource group", zap.Error(err)) return merr.Status(err), nil } + log.Info("drop resource group done") return merr.Success(), nil } @@ -929,6 +932,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq log.Warn("failed to transfer node", zap.Error(err)) return merr.Status(err), nil } + log.Info("transfer node done") return merr.Success(), nil } @@ -937,6 +941,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli zap.String("source", req.GetSourceResourceGroup()), zap.String("target", req.GetTargetResourceGroup()), zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64("numReplica", req.GetNumReplica()), ) log.Info("transfer replica request received") @@ -945,22 +950,12 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli return merr.Status(err), nil } - // TODO: !!!WARNING, replica manager and resource manager doesn't protected with each other by lock. - if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetSourceResourceGroup()); !ok { - err := merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup()) - return merr.Status(errors.Wrap(err, - fmt.Sprintf("the source resource group[%s] doesn't exist", req.GetSourceResourceGroup()))), nil + if err := s.broadcastAlterLoadConfigCollectionV2ForTransferReplica(ctx, req); err != nil { + log.Warn("failed to transfer replica between resource group", zap.Error(err)) + return merr.Status(err), nil } - - if ok := s.meta.ResourceManager.ContainResourceGroup(ctx, req.GetTargetResourceGroup()); !ok { - err := merr.WrapErrResourceGroupNotFound(req.GetTargetResourceGroup()) - return merr.Status(errors.Wrap(err, - fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()))), nil - } - - // Apply change into replica manager. - err := s.meta.TransferReplica(ctx, req.GetCollectionID(), req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumReplica())) - return merr.Status(err), nil + log.Info("transfer replica done") + return merr.Success(), nil } func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index dacdedb175..dcd329a3e1 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -857,6 +857,7 @@ func (suite *ServiceSuite) TestTransferNode() { func (suite *ServiceSuite) TestTransferReplica() { ctx := context.Background() + suite.loadAll() server := suite.server err := server.meta.ResourceManager.AddResourceGroup(ctx, "rg1", &rgpb.ResourceGroupConfig{ @@ -882,7 +883,7 @@ func (suite *ServiceSuite) TestTransferReplica() { NumReplica: 2, }) suite.NoError(err) - suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) + suite.ErrorIs(merr.Error(resp), merr.ErrCollectionNotLoaded) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: "rgg", @@ -911,22 +912,6 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.NoError(err) suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) - suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{ - CollectionID: 1, - ID: 111, - ResourceGroup: meta.DefaultResourceGroupName, - }, typeutil.NewUniqueSet(1))) - suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{ - CollectionID: 1, - ID: 222, - ResourceGroup: meta.DefaultResourceGroupName, - }, typeutil.NewUniqueSet(2))) - suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{ - CollectionID: 1, - ID: 333, - ResourceGroup: meta.DefaultResourceGroupName, - }, typeutil.NewUniqueSet(3))) - suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: 1001, Address: "localhost", @@ -958,62 +943,45 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.server.meta.HandleNodeUp(ctx, 1004) suite.server.meta.HandleNodeUp(ctx, 1005) - suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{ - CollectionID: 2, - ID: 444, - ResourceGroup: meta.DefaultResourceGroupName, - }, typeutil.NewUniqueSet(3))) - suite.server.meta.Put(ctx, meta.NewReplica(&querypb.Replica{ - CollectionID: 2, - ID: 555, - ResourceGroup: "rg2", - }, typeutil.NewUniqueSet(4))) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg2", - CollectionID: 2, + CollectionID: 1001, NumReplica: 1, }) - suite.NoError(err) - // we support dynamically increase replica num in resource group now. - suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) + suite.NoError(merr.CheckRPCCall(resp, err)) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg1", - CollectionID: 1, + CollectionID: 1001, NumReplica: 1, }) - suite.NoError(err) - // we support transfer replica to resource group load same collection. - suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) + suite.NoError(merr.CheckRPCCall(resp, err)) - replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(ctx, 1)) + replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(ctx, 1001)) suite.Equal(3, replicaNum) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg3", - CollectionID: 1, - NumReplica: 2, + CollectionID: 1001, + NumReplica: 1, }) - suite.NoError(err) - suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) + suite.NoError(merr.CheckRPCCall(resp, err)) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: "rg1", TargetResourceGroup: "rg3", - CollectionID: 1, + CollectionID: 1001, NumReplica: 1, }) - suite.NoError(err) - suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) - suite.Len(suite.server.meta.GetByResourceGroup(ctx, "rg3"), 3) + suite.NoError(merr.CheckRPCCall(resp, err)) // server unhealthy server.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg3", - CollectionID: 1, + CollectionID: 1001, NumReplica: 2, }) suite.NoError(err) diff --git a/internal/rootcoord/ddl_callbacks.go b/internal/rootcoord/ddl_callbacks.go index 5d44f60bf4..9b74c20dd0 100644 --- a/internal/rootcoord/ddl_callbacks.go +++ b/internal/rootcoord/ddl_callbacks.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // RegisterDDLCallbacks registers the ddl callbacks. @@ -142,21 +143,10 @@ func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadca return broadcaster, nil } -// startBroadcastWithAlterAliasLock starts a broadcast with alter alias lock. -func startBroadcastWithAlterAliasLock(ctx context.Context, dbName string, collectionName string, alias string) (broadcaster.BroadcastAPI, error) { - broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, - message.NewSharedDBNameResourceKey(dbName), - message.NewExclusiveCollectionNameResourceKey(dbName, collectionName), - message.NewExclusiveCollectionNameResourceKey(dbName, alias), - ) - if err != nil { - return nil, errors.Wrap(err, "failed to start broadcast with alter alias lock") - } - return broadcaster, nil -} - // startBroadcastWithCollectionLock starts a broadcast with collection lock. -func startBroadcastWithCollectionLock(ctx context.Context, dbName string, collectionName string) (broadcaster.BroadcastAPI, error) { +// CreateCollection and DropCollection can only be called with collection name itself, not alias. +// So it's safe to use collection name directly for those API. +func (*Core) startBroadcastWithCollectionLock(ctx context.Context, dbName string, collectionName string) (broadcaster.BroadcastAPI, error) { broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, message.NewSharedDBNameResourceKey(dbName), message.NewExclusiveCollectionNameResourceKey(dbName, collectionName), @@ -166,3 +156,14 @@ func startBroadcastWithCollectionLock(ctx context.Context, dbName string, collec } return broadcaster, nil } + +// startBroadcastWithAliasOrCollectionLock starts a broadcast with alias or collection lock. +// Some API like AlterCollection can be called with alias or collection name, +// so we need to get the real collection name to add resource key lock. +func (c *Core) startBroadcastWithAliasOrCollectionLock(ctx context.Context, dbName string, collectionNameOrAlias string) (broadcaster.BroadcastAPI, error) { + coll, err := c.meta.GetCollectionByName(ctx, dbName, collectionNameOrAlias, typeutil.MaxTimestamp) + if err != nil { + return nil, errors.Wrap(err, "failed to get collection by name") + } + return c.startBroadcastWithCollectionLock(ctx, dbName, coll.Name) +} diff --git a/internal/rootcoord/ddl_callbacks_alter_alias.go b/internal/rootcoord/ddl_callbacks_alter_alias.go index 6e5f564050..8c15e8d5de 100644 --- a/internal/rootcoord/ddl_callbacks_alter_alias.go +++ b/internal/rootcoord/ddl_callbacks_alter_alias.go @@ -32,7 +32,7 @@ func (c *Core) broadcastCreateAlias(ctx context.Context, req *milvuspb.CreateAli req.DbName = strings.TrimSpace(req.DbName) req.Alias = strings.TrimSpace(req.Alias) req.CollectionName = strings.TrimSpace(req.CollectionName) - broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias()) + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName()) if err != nil { return err } @@ -70,7 +70,7 @@ func (c *Core) broadcastAlterAlias(ctx context.Context, req *milvuspb.AlterAlias req.DbName = strings.TrimSpace(req.DbName) req.Alias = strings.TrimSpace(req.Alias) req.CollectionName = strings.TrimSpace(req.CollectionName) - broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias()) + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_add_field.go b/internal/rootcoord/ddl_callbacks_alter_collection_add_field.go index 5c8f56b6a5..8193e9a524 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_add_field.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_add_field.go @@ -19,7 +19,7 @@ import ( // broadcastAlterCollectionForAddField broadcasts the put collection message for add field. func (c *Core) broadcastAlterCollectionForAddField(ctx context.Context, req *milvuspb.AddCollectionFieldRequest) error { - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_field.go b/internal/rootcoord/ddl_callbacks_alter_collection_field.go index b6437dd322..c4d8d162b0 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_field.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_field.go @@ -16,7 +16,7 @@ import ( ) func (c *Core) broadcastAlterCollectionV2ForAlterCollectionField(ctx context.Context, req *milvuspb.AlterCollectionFieldRequest) error { - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_name.go b/internal/rootcoord/ddl_callbacks_alter_collection_name.go index 3a025ca72f..3c23cf0475 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_name.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_name.go @@ -36,10 +36,8 @@ func (c *Core) broadcastAlterCollectionForRenameCollection(ctx context.Context, // StartBroadcastWithResourceKeys will deduplicate the resource keys itself, so it's safe to add all the resource keys here. rks := []message.ResourceKey{ - message.NewSharedDBNameResourceKey(req.GetNewDBName()), - message.NewSharedDBNameResourceKey(req.GetDbName()), - message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetOldName()), - message.NewExclusiveCollectionNameResourceKey(req.GetNewDBName(), req.GetNewName()), + message.NewExclusiveDBNameResourceKey(req.GetNewDBName()), + message.NewExclusiveDBNameResourceKey(req.GetDbName()), } broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, rks...) if err != nil { diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go index 5ff40caf69..f0619dd12f 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go @@ -62,7 +62,7 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r return c.broadcastAlterCollectionForAlterDynamicField(ctx, req, targetValue) } - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } @@ -145,7 +145,7 @@ func (c *Core) broadcastAlterCollectionForAlterDynamicField(ctx context.Context, if len(req.GetProperties()) != 1 { return merr.WrapErrParameterInvalidMsg("cannot alter dynamic schema with other properties at the same time") } - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_create_collection.go b/internal/rootcoord/ddl_callbacks_create_collection.go index 3aa8d91ede..ae29af35d8 100644 --- a/internal/rootcoord/ddl_callbacks_create_collection.go +++ b/internal/rootcoord/ddl_callbacks_create_collection.go @@ -56,7 +56,7 @@ func (c *Core) broadcastCreateCollectionV1(ctx context.Context, req *milvuspb.Cr req.NumPartitions = int64(1) } - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_create_partition.go b/internal/rootcoord/ddl_callbacks_create_partition.go index b508cd75b6..7e3f52eba0 100644 --- a/internal/rootcoord/ddl_callbacks_create_partition.go +++ b/internal/rootcoord/ddl_callbacks_create_partition.go @@ -34,7 +34,7 @@ import ( ) func (c *Core) broadcastCreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) error { - broadcaster, err := startBroadcastWithCollectionLock(ctx, in.GetDbName(), in.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, in.GetDbName(), in.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_drop_alias.go b/internal/rootcoord/ddl_callbacks_drop_alias.go index 18ba709968..c6beecee23 100644 --- a/internal/rootcoord/ddl_callbacks_drop_alias.go +++ b/internal/rootcoord/ddl_callbacks_drop_alias.go @@ -25,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/distributed/streaming" - "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -34,9 +33,7 @@ import ( func (c *Core) broadcastDropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) error { req.DbName = strings.TrimSpace(req.DbName) req.Alias = strings.TrimSpace(req.Alias) - broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, - message.NewSharedDBNameResourceKey(req.GetDbName()), - message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetAlias())) + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_drop_collection.go b/internal/rootcoord/ddl_callbacks_drop_collection.go index e5f88e8e3b..b3031e071b 100644 --- a/internal/rootcoord/ddl_callbacks_drop_collection.go +++ b/internal/rootcoord/ddl_callbacks_drop_collection.go @@ -37,7 +37,7 @@ import ( ) func (c *Core) broadcastDropCollectionV1(ctx context.Context, req *milvuspb.DropCollectionRequest) error { - broadcaster, err := startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) + broadcaster, err := c.startBroadcastWithCollectionLock(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/ddl_callbacks_drop_partition.go b/internal/rootcoord/ddl_callbacks_drop_partition.go index 8d7b3f74ea..d90a5446b1 100644 --- a/internal/rootcoord/ddl_callbacks_drop_partition.go +++ b/internal/rootcoord/ddl_callbacks_drop_partition.go @@ -37,7 +37,7 @@ func (c *Core) broadcastDropPartition(ctx context.Context, in *milvuspb.DropPart return errors.New("default partition cannot be deleted") } - broadcaster, err := startBroadcastWithCollectionLock(ctx, in.GetDbName(), in.GetCollectionName()) + broadcaster, err := c.startBroadcastWithAliasOrCollectionLock(ctx, in.GetDbName(), in.GetCollectionName()) if err != nil { return err } diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index fbbe605146..b617a5f95f 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -40,6 +40,7 @@ type dropCollectionTask struct { } func (t *dropCollectionTask) validate(ctx context.Context) error { + // Critical promise here, also see comment of startBroadcastWithCollectionLock. if t.meta.IsAlias(ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) { return fmt.Errorf("cannot drop the collection via alias = %s", t.Req.CollectionName) } diff --git a/pkg/streaming/util/message/resource_key.go b/pkg/streaming/util/message/resource_key.go index da3c3d98b4..df77d4e3e3 100644 --- a/pkg/streaming/util/message/resource_key.go +++ b/pkg/streaming/util/message/resource_key.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" + "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -66,6 +67,9 @@ func NewExclusiveClusterResourceKey() ResourceKey { // NewSharedCollectionNameResourceKey creates a shared collection name resource key. func NewSharedCollectionNameResourceKey(dbName string, collectionName string) ResourceKey { + if dbName == "" { + dbName = util.DefaultDBName + } return ResourceKey{ Domain: messagespb.ResourceDomain_ResourceDomainCollectionName, Key: fmt.Sprintf("%s:%s", dbName, collectionName), @@ -75,6 +79,9 @@ func NewSharedCollectionNameResourceKey(dbName string, collectionName string) Re // NewExclusiveCollectionNameResourceKey creates an exclusive collection name resource key. func NewExclusiveCollectionNameResourceKey(dbName string, collectionName string) ResourceKey { + if dbName == "" { + dbName = util.DefaultDBName + } return ResourceKey{ Domain: messagespb.ResourceDomain_ResourceDomainCollectionName, Key: fmt.Sprintf("%s:%s", dbName, collectionName), @@ -84,6 +91,9 @@ func NewExclusiveCollectionNameResourceKey(dbName string, collectionName string) // NewSharedDBNameResourceKey creates a shared db name resource key. func NewSharedDBNameResourceKey(dbName string) ResourceKey { + if dbName == "" { + dbName = util.DefaultDBName + } return ResourceKey{ Domain: messagespb.ResourceDomain_ResourceDomainDBName, Key: dbName, @@ -93,6 +103,9 @@ func NewSharedDBNameResourceKey(dbName string) ResourceKey { // NewExclusiveDBNameResourceKey creates an exclusive db name resource key. func NewExclusiveDBNameResourceKey(dbName string) ResourceKey { + if dbName == "" { + dbName = util.DefaultDBName + } return ResourceKey{ Domain: messagespb.ResourceDomain_ResourceDomainDBName, Key: dbName,