// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rootcoord import ( "context" "fmt" "time" "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/v2/log" pb "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" ) type stepPriority int const ( stepPriorityLow = 0 stepPriorityNormal = 1 stepPriorityImportant = 10 stepPriorityUrgent = 1000 ) type nestedStep interface { Execute(ctx context.Context) ([]nestedStep, error) Desc() string Weight() stepPriority } type baseStep struct { core *Core } func (s baseStep) Desc() string { return "" } func (s baseStep) Weight() stepPriority { return stepPriorityLow } type addCollectionMetaStep struct { baseStep coll *model.Collection } func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.AddCollection(ctx, s.coll) return nil, err } func (s *addCollectionMetaStep) Desc() string { return fmt.Sprintf("add collection to meta table, name: %s, id: %d, ts: %d", s.coll.Name, s.coll.CollectionID, s.coll.CreateTime) } type deleteCollectionMetaStep struct { baseStep collectionID UniqueID ts Timestamp } func (s *deleteCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts) return nil, err } func (s *deleteCollectionMetaStep) Desc() string { return fmt.Sprintf("delete collection from meta table, id: %d, ts: %d", s.collectionID, s.ts) } func (s *deleteCollectionMetaStep) Weight() stepPriority { return stepPriorityNormal } type deleteDatabaseMetaStep struct { baseStep databaseName string ts Timestamp } func (s *deleteDatabaseMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.DropDatabase(ctx, s.databaseName, s.ts) return nil, err } func (s *deleteDatabaseMetaStep) Desc() string { return fmt.Sprintf("delete database from meta table, name: %s, ts: %d", s.databaseName, s.ts) } type removeDmlChannelsStep struct { baseStep pChannels []string } func (s *removeDmlChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { s.core.chanTimeTick.removeDmlChannels(s.pChannels...) return nil, nil } func (s *removeDmlChannelsStep) Desc() string { // this shouldn't be called. return fmt.Sprintf("remove dml channels: %v", s.pChannels) } func (s *removeDmlChannelsStep) Weight() stepPriority { // avoid too frequent tt. return stepPriorityUrgent } type watchChannelsStep struct { baseStep info *watchInfo } func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.broker.WatchChannels(ctx, s.info) return nil, err } func (s *watchChannelsStep) Desc() string { return fmt.Sprintf("watch channels, ts: %d, collection: %d, partition: %d, vChannels: %v", s.info.ts, s.info.collectionID, s.info.partitionID, s.info.vChannels) } type unwatchChannelsStep struct { baseStep collectionID UniqueID channels collectionChannels isSkip bool } func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { unwatchByDropMsg := &deleteCollectionDataStep{ baseStep: baseStep{core: s.core}, coll: &model.Collection{CollectionID: s.collectionID, PhysicalChannelNames: s.channels.physicalChannels}, isSkip: s.isSkip, } return unwatchByDropMsg.Execute(ctx) } func (s *unwatchChannelsStep) Desc() string { return fmt.Sprintf("unwatch channels, collection: %d, pChannels: %v, vChannels: %v", s.collectionID, s.channels.physicalChannels, s.channels.virtualChannels) } func (s *unwatchChannelsStep) Weight() stepPriority { return stepPriorityNormal } type changeCollectionStateStep struct { baseStep collectionID UniqueID state pb.CollectionState ts Timestamp } func (s *changeCollectionStateStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts) return nil, err } func (s *changeCollectionStateStep) Desc() string { return fmt.Sprintf("change collection state, collection: %d, ts: %d, state: %s", s.collectionID, s.ts, s.state.String()) } type expireCacheStep struct { baseStep dbName string collectionNames []string collectionID UniqueID partitionName string ts Timestamp opts []proxyutil.ExpireCacheOpt } func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.partitionName, s.ts, s.opts...) return nil, err } func (s *expireCacheStep) Desc() string { return fmt.Sprintf("expire cache, collection id: %d, collection names: %s, ts: %d", s.collectionID, s.collectionNames, s.ts) } type deleteCollectionDataStep struct { baseStep coll *model.Collection isSkip bool } func (s *deleteCollectionDataStep) Execute(ctx context.Context) ([]nestedStep, error) { if s.isSkip { return nil, nil } if _, err := s.core.garbageCollector.GcCollectionData(ctx, s.coll); err != nil { return nil, err } return nil, nil } func (s *deleteCollectionDataStep) Desc() string { return fmt.Sprintf("delete collection data, collection: %d", s.coll.CollectionID) } func (s *deleteCollectionDataStep) Weight() stepPriority { return stepPriorityImportant } type deletePartitionDataStep struct { baseStep pchans []string vchans []string partition *model.Partition isSkip bool } func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, error) { if s.isSkip { return nil, nil } _, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.vchans, s.partition) return nil, err } func (s *deletePartitionDataStep) Desc() string { return fmt.Sprintf("delete partition data, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID) } func (s *deletePartitionDataStep) Weight() stepPriority { return stepPriorityImportant } type releaseCollectionStep struct { baseStep collectionID UniqueID } func (s *releaseCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.broker.ReleaseCollection(ctx, s.collectionID) log.Ctx(ctx).Info("release collection done", zap.Int64("collectionID", s.collectionID)) return nil, err } func (s *releaseCollectionStep) Desc() string { return fmt.Sprintf("release collection: %d", s.collectionID) } func (s *releaseCollectionStep) Weight() stepPriority { return stepPriorityUrgent } type releasePartitionsStep struct { baseStep collectionID UniqueID partitionIDs []UniqueID } func (s *releasePartitionsStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.broker.ReleasePartitions(ctx, s.collectionID, s.partitionIDs...) return nil, err } func (s *releasePartitionsStep) Desc() string { return fmt.Sprintf("release partitions, collectionID=%d, partitionIDs=%v", s.collectionID, s.partitionIDs) } func (s *releasePartitionsStep) Weight() stepPriority { return stepPriorityUrgent } type dropIndexStep struct { baseStep collID UniqueID partIDs []UniqueID } func (s *dropIndexStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.broker.DropCollectionIndex(ctx, s.collID, s.partIDs) return nil, err } func (s *dropIndexStep) Desc() string { return fmt.Sprintf("drop collection index: %d", s.collID) } func (s *dropIndexStep) Weight() stepPriority { return stepPriorityNormal } type addPartitionMetaStep struct { baseStep partition *model.Partition } func (s *addPartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.AddPartition(ctx, s.partition) return nil, err } func (s *addPartitionMetaStep) Desc() string { return fmt.Sprintf("add partition to meta table, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID) } type broadcastCreatePartitionMsgStep struct { baseStep vchannels []string partition *model.Partition ts Timestamp } func (s *broadcastCreatePartitionMsgStep) Execute(ctx context.Context) ([]nestedStep, error) { req := &msgpb.CreatePartitionRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_CreatePartition), commonpbutil.WithTimeStamp(0), // ts is given by streamingnode. ), PartitionName: s.partition.PartitionName, CollectionID: s.partition.CollectionID, PartitionID: s.partition.PartitionID, } msgs := make([]message.MutableMessage, 0, len(s.vchannels)) for _, vchannel := range s.vchannels { msg, err := message.NewCreatePartitionMessageBuilderV1(). WithVChannel(vchannel). WithHeader(&message.CreatePartitionMessageHeader{ CollectionId: s.partition.CollectionID, PartitionId: s.partition.PartitionID, }). WithBody(req). BuildMutable() if err != nil { return nil, err } msgs = append(msgs, msg) } if err := streaming.WAL().AppendMessagesWithOption(ctx, streaming.AppendOption{ BarrierTimeTick: s.ts, }, msgs...).UnwrapFirstError(); err != nil { return nil, err } return nil, nil } func (s *broadcastCreatePartitionMsgStep) Desc() string { return fmt.Sprintf("broadcast create partition message to mq, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID) } type changePartitionStateStep struct { baseStep collectionID UniqueID partitionID UniqueID state pb.PartitionState ts Timestamp } func (s *changePartitionStateStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts) return nil, err } func (s *changePartitionStateStep) Desc() string { return fmt.Sprintf("change partition step, collection: %d, partition: %d, state: %s, ts: %d", s.collectionID, s.partitionID, s.state.String(), s.ts) } type removePartitionMetaStep struct { baseStep dbID UniqueID collectionID UniqueID partitionID UniqueID ts Timestamp } func (s *removePartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { err := s.core.meta.RemovePartition(ctx, s.dbID, s.collectionID, s.partitionID, s.ts) return nil, err } func (s *removePartitionMetaStep) Desc() string { return fmt.Sprintf("remove partition meta, collection: %d, partition: %d, ts: %d", s.collectionID, s.partitionID, s.ts) } func (s *removePartitionMetaStep) Weight() stepPriority { return stepPriorityNormal } type nullStep struct{} func (s *nullStep) Execute(ctx context.Context) ([]nestedStep, error) { return nil, nil } func (s *nullStep) Desc() string { return "" } func (s *nullStep) Weight() stepPriority { return stepPriorityLow } type AlterCollectionStep struct { baseStep oldColl *model.Collection newColl *model.Collection ts Timestamp fieldModify bool } func (a *AlterCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) { err := a.core.meta.AlterCollection(ctx, a.oldColl, a.newColl, a.ts, a.fieldModify) return nil, err } func (a *AlterCollectionStep) Desc() string { return fmt.Sprintf("alter collection, collectionID: %d, ts: %d", a.oldColl.CollectionID, a.ts) } type BroadcastAlteredCollectionStep struct { baseStep req *milvuspb.AlterCollectionRequest core *Core } func (b *BroadcastAlteredCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) { // TODO: support online schema change mechanism // It only broadcast collection properties to DataCoord service err := b.core.broker.BroadcastAlteredCollection(ctx, b.req) return nil, err } func (b *BroadcastAlteredCollectionStep) Desc() string { return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID) } type AddCollectionFieldStep struct { baseStep oldColl *model.Collection updatedCollection *model.Collection newField *model.Field ts Timestamp } func (a *AddCollectionFieldStep) Execute(ctx context.Context) ([]nestedStep, error) { // newColl := a.oldColl.Clone() // newColl.Fields = append(newColl.Fields, a.newField) err := a.core.meta.AlterCollection(ctx, a.oldColl, a.updatedCollection, a.updatedCollection.UpdateTimestamp, true) log.Ctx(ctx).Info("add field done", zap.Int64("collectionID", a.oldColl.CollectionID), zap.Any("new field", a.newField)) return nil, err } func (a *AddCollectionFieldStep) Desc() string { return fmt.Sprintf("add field, collectionID: %d, fieldID: %d, ts: %d", a.oldColl.CollectionID, a.newField.FieldID, a.ts) } type WriteSchemaChangeWALStep struct { baseStep collection *model.Collection ts Timestamp } func (s *WriteSchemaChangeWALStep) Execute(ctx context.Context) ([]nestedStep, error) { vchannels := s.collection.VirtualChannelNames schema := &schemapb.CollectionSchema{ Name: s.collection.Name, Description: s.collection.Description, AutoID: s.collection.AutoID, Fields: model.MarshalFieldModels(s.collection.Fields), StructArrayFields: model.MarshalStructArrayFieldModels(s.collection.StructArrayFields), Functions: model.MarshalFunctionModels(s.collection.Functions), EnableDynamicField: s.collection.EnableDynamicField, Properties: s.collection.Properties, } schemaMsg, err := message.NewSchemaChangeMessageBuilderV2(). WithBroadcast(vchannels). WithHeader(&message.SchemaChangeMessageHeader{ CollectionId: s.collection.CollectionID, }). WithBody(&message.SchemaChangeMessageBody{ Schema: schema, }).BuildBroadcast() if err != nil { return nil, err } resp, err := streaming.WAL().Broadcast().Append(ctx, schemaMsg) if err != nil { return nil, err } // use broadcast max msg timestamp as update timestamp here s.collection.UpdateTimestamp = lo.Max(lo.Map(vchannels, func(channelName string, _ int) uint64 { return resp.GetAppendResult(channelName).TimeTick })) log.Ctx(ctx).Info( "broadcast schema change success", zap.Uint64("broadcastID", resp.BroadcastID), zap.Uint64("WALUpdateTimestamp", s.collection.UpdateTimestamp), zap.Any("appendResults", resp.AppendResults), ) return nil, nil } func (s *WriteSchemaChangeWALStep) Desc() string { return fmt.Sprintf("write schema change WALcollectionID: %d, ts: %d", s.collection.CollectionID, s.ts) } type AlterDatabaseStep struct { baseStep oldDB *model.Database newDB *model.Database ts Timestamp } func (a *AlterDatabaseStep) Execute(ctx context.Context) ([]nestedStep, error) { err := a.core.meta.AlterDatabase(ctx, a.oldDB, a.newDB, a.ts) return nil, err } func (a *AlterDatabaseStep) Desc() string { return fmt.Sprintf("alter database, databaseID: %d, databaseName: %s, ts: %d", a.oldDB.ID, a.oldDB.Name, a.ts) } var ( confirmGCInterval = time.Minute * 20 allPartition UniqueID = -1 ) type confirmGCStep struct { baseStep collectionID UniqueID partitionID UniqueID lastScheduledTime time.Time } func newConfirmGCStep(core *Core, collectionID, partitionID UniqueID) *confirmGCStep { return &confirmGCStep{ baseStep: baseStep{core: core}, collectionID: collectionID, partitionID: partitionID, lastScheduledTime: time.Now(), } } func (b *confirmGCStep) Execute(ctx context.Context) ([]nestedStep, error) { if time.Since(b.lastScheduledTime) < confirmGCInterval { return nil, fmt.Errorf("wait for reschedule to confirm GC, collection: %d, partition: %d, last scheduled time: %s, now: %s", b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String()) } finished := b.core.broker.GcConfirm(ctx, b.collectionID, b.partitionID) if finished { return nil, nil } b.lastScheduledTime = time.Now() return nil, fmt.Errorf("GC is not finished, collection: %d, partition: %d, last scheduled time: %s, now: %s", b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String()) } func (b *confirmGCStep) Desc() string { return fmt.Sprintf("wait for GC finished, collection: %d, partition: %d, last scheduled time: %s, now: %s", b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String()) } func (b *confirmGCStep) Weight() stepPriority { return stepPriorityLow } type simpleStep struct { desc string weight stepPriority executeFunc func(ctx context.Context) ([]nestedStep, error) } func NewSimpleStep(desc string, executeFunc func(ctx context.Context) ([]nestedStep, error)) nestedStep { return &simpleStep{ desc: desc, weight: stepPriorityNormal, executeFunc: executeFunc, } } func (s *simpleStep) Execute(ctx context.Context) ([]nestedStep, error) { return s.executeFunc(ctx) } func (s *simpleStep) Desc() string { return s.desc } func (s *simpleStep) Weight() stepPriority { return s.weight }