mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
issue: #43897, #44123 pr: #44898 related pr: #44607 #44642 #44792 #44809 #44564 #44560 #44735 #44822 #44865 #44850 #44942 #44874 #44963 #44886 #44898 enhance: remove redundant channel manager from datacoord (#44532) issue: #41611 - After enabling streaming arch, channel manager of data coord is a redundant component. fix: Fix CDC OOM due to high buffer size (#44607) Fix CDC OOM by: 1. free msg buffer manually. 2. limit max msg buffer size. 3. reduce scanner msg hander buffer size. issue: https://github.com/milvus-io/milvus/issues/44123 fix: remove wrong start timetick to avoid filtering DML whose timetick is less than it. (#44691) issue: #41611 - introduced by #44532 enhance: support remove cluster from replicate topology (#44642) issue: #44558, #44123 - Update config(A->C) to A and C, config(B) to B on replicate topology (A->B,A->C) can remove the B from replicate topology - Fix some metric error of CDC fix: check if qn is sqn with label and streamingnode list (#44792) issue: #44014 - On standalone, the query node inside need to load segment and watch channel, so the querynode is not a embeded querynode in streamingnode without `LabelStreamingNodeEmbeddedQueryNode`. The channel dist manager can not confirm a standalone node is a embededStreamingNode. Bug is introduced by #44099 enhance: Make GetReplicateInfo API work at the pchannel level (#44809) issue: https://github.com/milvus-io/milvus/issues/44123 enhance: Speed up CDC scheduling (#44564) Make CDC watch etcd replicate pchannel meta instead of listing them periodically. issue: https://github.com/milvus-io/milvus/issues/44123 enhance: refactor update replicate config operation using wal-broadcast-based DDL/DCL framework (#44560) issue: #43897 - UpdateReplicateConfig operation will broadcast AlterReplicateConfig message into all pchannels with cluster-exclusive-lock. - Begin txn message will use commit message timetick now (to avoid timetick rollback when CDC with txn message). - If current cluster is secondary, the UpdateReplicateConfig will wait until the replicate configuration is consistent with the config replicated from primary. enhance: support rbac with WAL-based DDL framework (#44735) issue: #43897 - RBAC(Roles/Users/Privileges/Privilege Groups) is implemented by WAL-based DDL framework now. - Support following message type in wal `AlterUser`, `DropUser`, `AlterRole`, `DropRole`, `AlterUserRole`, `DropUserRole`, `AlterPrivilege`, `DropPrivilege`, `AlterPrivilegeGroup`, `DropPrivilegeGroup`, `RestoreRBAC`. - RBAC can be synced by new CDC now. - Refactor some UT for RBAC. enhance: support database with WAL-based DDL framework (#44822) issue: #43897 - Database related DDL is implemented by WAL-based DDL framework now. - Support following message type in wal CreateDatabase, AlterDatabase, DropDatabase. - Database DDL can be synced by new CDC now. - Refactor some UT for Database DDL. enhance: support alias with WAL-based DDL framework (#44865) issue: #43897 - Alias related DDL is implemented by WAL-based DDL framework now. - Support following message type in wal AlterAlias, DropAlias. - Alias DDL can be synced by new CDC now. - Refactor some UT for Alias DDL. enhance: Disable import for replicating cluster (#44850) 1. Import in replicating cluster is not supported yet, so disable it for now. 2. Remove GetReplicateConfiguration wal API issue: https://github.com/milvus-io/milvus/issues/44123 fix: use short debug string to avoid newline in debug logs (#44925) issue: #44924 fix: rerank before requery if reranker didn't use field data (#44942) issue: #44918 enhance: support resource group with WAL-based DDL framework (#44874) issue: #43897 - Resource group related DDL is implemented by WAL-based DDL framework now. - Support following message type in wal AlterResourceGroup, DropResourceGroup. - Resource group DDL can be synced by new CDC now. - Refactor some UT for resource group DDL. fix: Fix Fix replication txn data loss during chaos (#44963) Only confirm CommitMsg for txn messages to prevent data loss. issue: https://github.com/milvus-io/milvus/issues/44962, https://github.com/milvus-io/milvus/issues/44123 fix: wrong execution order of DDL/DCL on secondary (#44886) issue: #44697, #44696 - The DDL executing order of secondary keep same with order of control channel timetick now. - filtering the control channel operation on shard manager of streamingnode to avoid wrong vchannel of create segment. - fix that the immutable txn message lost replicate header. fix: Fix primary-secondary replication switch blocking (#44898) 1. Fix primary-secondary replication switchover blocking by delete replicate pchannel meta using modRevision. 2. Stop channel replicator(scanner) when cluster role changes to prevent continued message consumption and replication. 3. Close Milvus client to prevent goroutine leak. 4. Create Milvus client once for a channel replicator. 5. Simplify CDC controller and resources. issue: https://github.com/milvus-io/milvus/issues/44123 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: chyezh <chyezh@outlook.com> Co-authored-by: yihao.dai <yihao.dai@zilliz.com>
647 lines
18 KiB
Go
647 lines
18 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
pb "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
|
|
)
|
|
|
|
type stepPriority int
|
|
|
|
const (
|
|
stepPriorityLow = 0
|
|
stepPriorityNormal = 1
|
|
stepPriorityImportant = 10
|
|
stepPriorityUrgent = 1000
|
|
)
|
|
|
|
type nestedStep interface {
|
|
Execute(ctx context.Context) ([]nestedStep, error)
|
|
Desc() string
|
|
Weight() stepPriority
|
|
}
|
|
|
|
type baseStep struct {
|
|
core *Core
|
|
}
|
|
|
|
func (s baseStep) Desc() string {
|
|
return ""
|
|
}
|
|
|
|
func (s baseStep) Weight() stepPriority {
|
|
return stepPriorityLow
|
|
}
|
|
|
|
type addCollectionMetaStep struct {
|
|
baseStep
|
|
coll *model.Collection
|
|
}
|
|
|
|
func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.AddCollection(ctx, s.coll)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *addCollectionMetaStep) Desc() string {
|
|
return fmt.Sprintf("add collection to meta table, name: %s, id: %d, ts: %d", s.coll.Name, s.coll.CollectionID, s.coll.CreateTime)
|
|
}
|
|
|
|
type deleteCollectionMetaStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *deleteCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *deleteCollectionMetaStep) Desc() string {
|
|
return fmt.Sprintf("delete collection from meta table, id: %d, ts: %d", s.collectionID, s.ts)
|
|
}
|
|
|
|
func (s *deleteCollectionMetaStep) Weight() stepPriority {
|
|
return stepPriorityNormal
|
|
}
|
|
|
|
type deleteDatabaseMetaStep struct {
|
|
baseStep
|
|
databaseName string
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *deleteDatabaseMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.DropDatabase(ctx, s.databaseName, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *deleteDatabaseMetaStep) Desc() string {
|
|
return fmt.Sprintf("delete database from meta table, name: %s, ts: %d", s.databaseName, s.ts)
|
|
}
|
|
|
|
type removeDmlChannelsStep struct {
|
|
baseStep
|
|
pChannels []string
|
|
}
|
|
|
|
func (s *removeDmlChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
s.core.chanTimeTick.removeDmlChannels(s.pChannels...)
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *removeDmlChannelsStep) Desc() string {
|
|
// this shouldn't be called.
|
|
return fmt.Sprintf("remove dml channels: %v", s.pChannels)
|
|
}
|
|
|
|
func (s *removeDmlChannelsStep) Weight() stepPriority {
|
|
// avoid too frequent tt.
|
|
return stepPriorityUrgent
|
|
}
|
|
|
|
type watchChannelsStep struct {
|
|
baseStep
|
|
info *watchInfo
|
|
}
|
|
|
|
func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.broker.WatchChannels(ctx, s.info)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *watchChannelsStep) Desc() string {
|
|
return fmt.Sprintf("watch channels, ts: %d, collection: %d, partition: %d, vChannels: %v",
|
|
s.info.ts, s.info.collectionID, s.info.partitionID, s.info.vChannels)
|
|
}
|
|
|
|
type unwatchChannelsStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
channels collectionChannels
|
|
|
|
isSkip bool
|
|
}
|
|
|
|
func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
unwatchByDropMsg := &deleteCollectionDataStep{
|
|
baseStep: baseStep{core: s.core},
|
|
coll: &model.Collection{CollectionID: s.collectionID, PhysicalChannelNames: s.channels.physicalChannels},
|
|
isSkip: s.isSkip,
|
|
}
|
|
return unwatchByDropMsg.Execute(ctx)
|
|
}
|
|
|
|
func (s *unwatchChannelsStep) Desc() string {
|
|
return fmt.Sprintf("unwatch channels, collection: %d, pChannels: %v, vChannels: %v",
|
|
s.collectionID, s.channels.physicalChannels, s.channels.virtualChannels)
|
|
}
|
|
|
|
func (s *unwatchChannelsStep) Weight() stepPriority {
|
|
return stepPriorityNormal
|
|
}
|
|
|
|
type changeCollectionStateStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
state pb.CollectionState
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *changeCollectionStateStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *changeCollectionStateStep) Desc() string {
|
|
return fmt.Sprintf("change collection state, collection: %d, ts: %d, state: %s",
|
|
s.collectionID, s.ts, s.state.String())
|
|
}
|
|
|
|
type cleanupMetricsStep struct {
|
|
baseStep
|
|
dbName string
|
|
collectionName string
|
|
}
|
|
|
|
func (s *cleanupMetricsStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
metrics.CleanupRootCoordCollectionMetrics(s.dbName, s.collectionName)
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *cleanupMetricsStep) Desc() string {
|
|
return fmt.Sprintf("change collection state, db: %s, collectionstate: %s",
|
|
s.dbName, s.collectionName)
|
|
}
|
|
|
|
type expireCacheStep struct {
|
|
baseStep
|
|
dbName string
|
|
collectionNames []string
|
|
collectionID UniqueID
|
|
partitionName string
|
|
ts Timestamp
|
|
opts []proxyutil.ExpireCacheOpt
|
|
}
|
|
|
|
func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.partitionName, s.ts, s.opts...)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *expireCacheStep) Desc() string {
|
|
return fmt.Sprintf("expire cache, collection id: %d, collection names: %s, ts: %d",
|
|
s.collectionID, s.collectionNames, s.ts)
|
|
}
|
|
|
|
type deleteCollectionDataStep struct {
|
|
baseStep
|
|
coll *model.Collection
|
|
|
|
isSkip bool
|
|
}
|
|
|
|
func (s *deleteCollectionDataStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
if s.isSkip {
|
|
return nil, nil
|
|
}
|
|
if _, err := s.core.garbageCollector.GcCollectionData(ctx, s.coll); err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *deleteCollectionDataStep) Desc() string {
|
|
return fmt.Sprintf("delete collection data, collection: %d", s.coll.CollectionID)
|
|
}
|
|
|
|
func (s *deleteCollectionDataStep) Weight() stepPriority {
|
|
return stepPriorityImportant
|
|
}
|
|
|
|
type deletePartitionDataStep struct {
|
|
baseStep
|
|
pchans []string
|
|
vchans []string
|
|
partition *model.Partition
|
|
|
|
isSkip bool
|
|
}
|
|
|
|
func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
if s.isSkip {
|
|
return nil, nil
|
|
}
|
|
_, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.vchans, s.partition)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *deletePartitionDataStep) Desc() string {
|
|
return fmt.Sprintf("delete partition data, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID)
|
|
}
|
|
|
|
func (s *deletePartitionDataStep) Weight() stepPriority {
|
|
return stepPriorityImportant
|
|
}
|
|
|
|
type releaseCollectionStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
}
|
|
|
|
func (s *releaseCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.broker.ReleaseCollection(ctx, s.collectionID)
|
|
log.Ctx(ctx).Info("release collection done", zap.Int64("collectionID", s.collectionID))
|
|
return nil, err
|
|
}
|
|
|
|
func (s *releaseCollectionStep) Desc() string {
|
|
return fmt.Sprintf("release collection: %d", s.collectionID)
|
|
}
|
|
|
|
func (s *releaseCollectionStep) Weight() stepPriority {
|
|
return stepPriorityUrgent
|
|
}
|
|
|
|
type releasePartitionsStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
partitionIDs []UniqueID
|
|
}
|
|
|
|
func (s *releasePartitionsStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.broker.ReleasePartitions(ctx, s.collectionID, s.partitionIDs...)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *releasePartitionsStep) Desc() string {
|
|
return fmt.Sprintf("release partitions, collectionID=%d, partitionIDs=%v", s.collectionID, s.partitionIDs)
|
|
}
|
|
|
|
func (s *releasePartitionsStep) Weight() stepPriority {
|
|
return stepPriorityUrgent
|
|
}
|
|
|
|
type dropIndexStep struct {
|
|
baseStep
|
|
collID UniqueID
|
|
partIDs []UniqueID
|
|
}
|
|
|
|
func (s *dropIndexStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.broker.DropCollectionIndex(ctx, s.collID, s.partIDs)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *dropIndexStep) Desc() string {
|
|
return fmt.Sprintf("drop collection index: %d", s.collID)
|
|
}
|
|
|
|
func (s *dropIndexStep) Weight() stepPriority {
|
|
return stepPriorityNormal
|
|
}
|
|
|
|
type addPartitionMetaStep struct {
|
|
baseStep
|
|
partition *model.Partition
|
|
}
|
|
|
|
func (s *addPartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.AddPartition(ctx, s.partition)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *addPartitionMetaStep) Desc() string {
|
|
return fmt.Sprintf("add partition to meta table, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID)
|
|
}
|
|
|
|
type broadcastCreatePartitionMsgStep struct {
|
|
baseStep
|
|
vchannels []string
|
|
partition *model.Partition
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *broadcastCreatePartitionMsgStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
req := &msgpb.CreatePartitionRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_CreatePartition),
|
|
commonpbutil.WithTimeStamp(0), // ts is given by streamingnode.
|
|
),
|
|
PartitionName: s.partition.PartitionName,
|
|
CollectionID: s.partition.CollectionID,
|
|
PartitionID: s.partition.PartitionID,
|
|
}
|
|
|
|
msgs := make([]message.MutableMessage, 0, len(s.vchannels))
|
|
for _, vchannel := range s.vchannels {
|
|
msg, err := message.NewCreatePartitionMessageBuilderV1().
|
|
WithVChannel(vchannel).
|
|
WithHeader(&message.CreatePartitionMessageHeader{
|
|
CollectionId: s.partition.CollectionID,
|
|
PartitionId: s.partition.PartitionID,
|
|
}).
|
|
WithBody(req).
|
|
BuildMutable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
msgs = append(msgs, msg)
|
|
}
|
|
if err := streaming.WAL().AppendMessagesWithOption(ctx, streaming.AppendOption{
|
|
BarrierTimeTick: s.ts,
|
|
}, msgs...).UnwrapFirstError(); err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *broadcastCreatePartitionMsgStep) Desc() string {
|
|
return fmt.Sprintf("broadcast create partition message to mq, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID)
|
|
}
|
|
|
|
type changePartitionStateStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
partitionID UniqueID
|
|
state pb.PartitionState
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *changePartitionStateStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *changePartitionStateStep) Desc() string {
|
|
return fmt.Sprintf("change partition step, collection: %d, partition: %d, state: %s, ts: %d",
|
|
s.collectionID, s.partitionID, s.state.String(), s.ts)
|
|
}
|
|
|
|
type removePartitionMetaStep struct {
|
|
baseStep
|
|
dbID UniqueID
|
|
collectionID UniqueID
|
|
partitionID UniqueID
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *removePartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.RemovePartition(ctx, s.dbID, s.collectionID, s.partitionID, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *removePartitionMetaStep) Desc() string {
|
|
return fmt.Sprintf("remove partition meta, collection: %d, partition: %d, ts: %d", s.collectionID, s.partitionID, s.ts)
|
|
}
|
|
|
|
func (s *removePartitionMetaStep) Weight() stepPriority {
|
|
return stepPriorityNormal
|
|
}
|
|
|
|
type nullStep struct{}
|
|
|
|
func (s *nullStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *nullStep) Desc() string {
|
|
return ""
|
|
}
|
|
|
|
func (s *nullStep) Weight() stepPriority {
|
|
return stepPriorityLow
|
|
}
|
|
|
|
type AlterCollectionStep struct {
|
|
baseStep
|
|
oldColl *model.Collection
|
|
newColl *model.Collection
|
|
ts Timestamp
|
|
fieldModify bool
|
|
}
|
|
|
|
func (a *AlterCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := a.core.meta.AlterCollection(ctx, a.oldColl, a.newColl, a.ts, a.fieldModify)
|
|
return nil, err
|
|
}
|
|
|
|
func (a *AlterCollectionStep) Desc() string {
|
|
return fmt.Sprintf("alter collection, collectionID: %d, ts: %d", a.oldColl.CollectionID, a.ts)
|
|
}
|
|
|
|
type BroadcastAlteredCollectionStep struct {
|
|
baseStep
|
|
req *milvuspb.AlterCollectionRequest
|
|
core *Core
|
|
}
|
|
|
|
func (b *BroadcastAlteredCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
// TODO: support online schema change mechanism
|
|
// It only broadcast collection properties to DataCoord service
|
|
err := b.core.broker.BroadcastAlteredCollection(ctx, b.req)
|
|
return nil, err
|
|
}
|
|
|
|
func (b *BroadcastAlteredCollectionStep) Desc() string {
|
|
return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID)
|
|
}
|
|
|
|
type AddCollectionFieldStep struct {
|
|
baseStep
|
|
oldColl *model.Collection
|
|
updatedCollection *model.Collection
|
|
newField *model.Field
|
|
ts Timestamp
|
|
}
|
|
|
|
func (a *AddCollectionFieldStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
// newColl := a.oldColl.Clone()
|
|
// newColl.Fields = append(newColl.Fields, a.newField)
|
|
err := a.core.meta.AlterCollection(ctx, a.oldColl, a.updatedCollection, a.updatedCollection.UpdateTimestamp, true)
|
|
log.Ctx(ctx).Info("add field done", zap.Int64("collectionID", a.oldColl.CollectionID), zap.Any("new field", a.newField))
|
|
return nil, err
|
|
}
|
|
|
|
func (a *AddCollectionFieldStep) Desc() string {
|
|
return fmt.Sprintf("add field, collectionID: %d, fieldID: %d, ts: %d", a.oldColl.CollectionID, a.newField.FieldID, a.ts)
|
|
}
|
|
|
|
type WriteSchemaChangeWALStep struct {
|
|
baseStep
|
|
collection *model.Collection
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *WriteSchemaChangeWALStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
vchannels := s.collection.VirtualChannelNames
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: s.collection.Name,
|
|
Description: s.collection.Description,
|
|
AutoID: s.collection.AutoID,
|
|
Fields: model.MarshalFieldModels(s.collection.Fields),
|
|
StructArrayFields: model.MarshalStructArrayFieldModels(s.collection.StructArrayFields),
|
|
Functions: model.MarshalFunctionModels(s.collection.Functions),
|
|
EnableDynamicField: s.collection.EnableDynamicField,
|
|
Properties: s.collection.Properties,
|
|
}
|
|
|
|
schemaMsg, err := message.NewSchemaChangeMessageBuilderV2().
|
|
WithBroadcast(vchannels).
|
|
WithHeader(&message.SchemaChangeMessageHeader{
|
|
CollectionId: s.collection.CollectionID,
|
|
}).
|
|
WithBody(&message.SchemaChangeMessageBody{
|
|
Schema: schema,
|
|
}).BuildBroadcast()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := streaming.WAL().Broadcast().Append(ctx, schemaMsg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// use broadcast max msg timestamp as update timestamp here
|
|
s.collection.UpdateTimestamp = lo.Max(lo.Map(vchannels, func(channelName string, _ int) uint64 {
|
|
return resp.GetAppendResult(channelName).TimeTick
|
|
}))
|
|
log.Ctx(ctx).Info(
|
|
"broadcast schema change success",
|
|
zap.Uint64("broadcastID", resp.BroadcastID),
|
|
zap.Uint64("WALUpdateTimestamp", s.collection.UpdateTimestamp),
|
|
zap.Any("appendResults", resp.AppendResults),
|
|
)
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *WriteSchemaChangeWALStep) Desc() string {
|
|
return fmt.Sprintf("write schema change WALcollectionID: %d, ts: %d", s.collection.CollectionID, s.ts)
|
|
}
|
|
|
|
type renameCollectionStep struct {
|
|
baseStep
|
|
dbName string
|
|
oldName string
|
|
newDBName string
|
|
newName string
|
|
ts Timestamp
|
|
}
|
|
|
|
func (s *renameCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
err := s.core.meta.RenameCollection(ctx, s.dbName, s.oldName, s.newDBName, s.newName, s.ts)
|
|
return nil, err
|
|
}
|
|
|
|
func (s *renameCollectionStep) Desc() string {
|
|
return fmt.Sprintf("rename collection from %s.%s to %s.%s, ts: %d",
|
|
s.dbName, s.oldName, s.newDBName, s.newName, s.ts)
|
|
}
|
|
|
|
var (
|
|
confirmGCInterval = time.Minute * 20
|
|
allPartition UniqueID = -1
|
|
)
|
|
|
|
type confirmGCStep struct {
|
|
baseStep
|
|
collectionID UniqueID
|
|
partitionID UniqueID
|
|
lastScheduledTime time.Time
|
|
}
|
|
|
|
func newConfirmGCStep(core *Core, collectionID, partitionID UniqueID) *confirmGCStep {
|
|
return &confirmGCStep{
|
|
baseStep: baseStep{core: core},
|
|
collectionID: collectionID,
|
|
partitionID: partitionID,
|
|
lastScheduledTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
func (b *confirmGCStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
if time.Since(b.lastScheduledTime) < confirmGCInterval {
|
|
return nil, fmt.Errorf("wait for reschedule to confirm GC, collection: %d, partition: %d, last scheduled time: %s, now: %s",
|
|
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
|
|
}
|
|
|
|
finished := b.core.broker.GcConfirm(ctx, b.collectionID, b.partitionID)
|
|
if finished {
|
|
return nil, nil
|
|
}
|
|
|
|
b.lastScheduledTime = time.Now()
|
|
|
|
return nil, fmt.Errorf("GC is not finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
|
|
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
|
|
}
|
|
|
|
func (b *confirmGCStep) Desc() string {
|
|
return fmt.Sprintf("wait for GC finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
|
|
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
|
|
}
|
|
|
|
func (b *confirmGCStep) Weight() stepPriority {
|
|
return stepPriorityLow
|
|
}
|
|
|
|
type simpleStep struct {
|
|
desc string
|
|
weight stepPriority
|
|
executeFunc func(ctx context.Context) ([]nestedStep, error)
|
|
}
|
|
|
|
func NewSimpleStep(desc string, executeFunc func(ctx context.Context) ([]nestedStep, error)) nestedStep {
|
|
return &simpleStep{
|
|
desc: desc,
|
|
weight: stepPriorityNormal,
|
|
executeFunc: executeFunc,
|
|
}
|
|
}
|
|
|
|
func (s *simpleStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
|
return s.executeFunc(ctx)
|
|
}
|
|
|
|
func (s *simpleStep) Desc() string {
|
|
return s.desc
|
|
}
|
|
|
|
func (s *simpleStep) Weight() stepPriority {
|
|
return s.weight
|
|
}
|