diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index 6aaa0b2554..2f8c1ecf78 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -13,7 +12,6 @@ import ( "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/util/funcutil" "go.uber.org/zap" ) @@ -52,11 +50,6 @@ func newServerBroker(s *Core) *ServerBroker { func (b *ServerBroker) ReleaseCollection(ctx context.Context, collectionID UniqueID) error { log.Info("releasing collection", zap.Int64("collection", collectionID)) - if err := funcutil.WaitForComponentHealthy(ctx, b.s.queryCoord, "QueryCoord", 100, time.Millisecond*200); err != nil { - log.Error("failed to release collection, querycoord not healthy", zap.Error(err), zap.Int64("collection", collectionID)) - return err - } - resp, err := b.s.queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_ReleaseCollection}, CollectionID: collectionID, @@ -100,10 +93,6 @@ func toKeyDataPairs(m map[string][]byte) []*commonpb.KeyDataPair { func (b *ServerBroker) WatchChannels(ctx context.Context, info *watchInfo) error { log.Info("watching channels", zap.Uint64("ts", info.ts), zap.Int64("collection", info.collectionID), zap.Strings("vChannels", info.vChannels)) - if err := funcutil.WaitForComponentHealthy(ctx, b.s.dataCoord, "DataCoord", 100, time.Millisecond*200); err != nil { - return err - } - resp, err := b.s.dataCoord.WatchChannels(ctx, &datapb.WatchChannelsRequest{ CollectionID: info.collectionID, ChannelNames: info.vChannels, @@ -193,9 +182,6 @@ func (b *ServerBroker) Import(ctx context.Context, req *datapb.ImportTaskRequest } func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID) error { - if err := funcutil.WaitForComponentHealthy(ctx, b.s.indexCoord, "IndexCoord", 100, time.Millisecond*100); err != nil { - return err - } rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{ CollectionID: collID, IndexName: "", diff --git a/internal/rootcoord/broker_test.go b/internal/rootcoord/broker_test.go index 22600f4be0..324153a6fd 100644 --- a/internal/rootcoord/broker_test.go +++ b/internal/rootcoord/broker_test.go @@ -12,14 +12,6 @@ import ( ) func TestServerBroker_ReleaseCollection(t *testing.T) { - t.Run("not healthy", func(t *testing.T) { - c := newTestCore(withUnhealthyQueryCoord()) - b := newServerBroker(c) - ctx := context.Background() - err := b.ReleaseCollection(ctx, 1) - assert.Error(t, err) - }) - t.Run("failed to execute", func(t *testing.T) { c := newTestCore(withInvalidQueryCoord()) b := newServerBroker(c) @@ -74,14 +66,6 @@ func TestServerBroker_GetSegmentInfo(t *testing.T) { } func TestServerBroker_WatchChannels(t *testing.T) { - t.Run("unhealthy", func(t *testing.T) { - c := newTestCore(withUnhealthyDataCoord()) - b := newServerBroker(c) - ctx := context.Background() - err := b.WatchChannels(ctx, &watchInfo{}) - assert.Error(t, err) - }) - t.Run("failed to execute", func(t *testing.T) { defer cleanTestEnv() @@ -228,14 +212,6 @@ func TestServerBroker_Import(t *testing.T) { } func TestServerBroker_DropCollectionIndex(t *testing.T) { - t.Run("not healthy", func(t *testing.T) { - c := newTestCore(withUnhealthyIndexCoord()) - b := newServerBroker(c) - ctx := context.Background() - err := b.DropCollectionIndex(ctx, 1) - assert.Error(t, err) - }) - t.Run("failed to execute", func(t *testing.T) { c := newTestCore(withInvalidIndexCoord()) b := newServerBroker(c) diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index ffdb71029d..5ae41c8d2a 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -257,20 +257,20 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { return nil } - undoTask := newBaseUndoTask() - undoTask.AddStep(&NullStep{}, &RemoveDmlChannelsStep{ + undoTask := newBaseUndoTask(t.core.stepExecutor) + undoTask.AddStep(&nullStep{}, &removeDmlChannelsStep{ baseStep: baseStep{core: t.core}, - pchannels: chanNames, + pChannels: chanNames, }) // remove dml channels if any error occurs. - undoTask.AddStep(&AddCollectionMetaStep{ + undoTask.AddStep(&addCollectionMetaStep{ baseStep: baseStep{core: t.core}, coll: &collInfo, - }, &DeleteCollectionMetaStep{ + }, &deleteCollectionMetaStep{ baseStep: baseStep{core: t.core}, collectionID: collID, ts: ts, }) - undoTask.AddStep(&WatchChannelsStep{ + undoTask.AddStep(&watchChannelsStep{ baseStep: baseStep{core: t.core}, info: &watchInfo{ ts: ts, @@ -278,17 +278,17 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { vChannels: t.channels.virtualChannels, startPositions: toKeyDataPairs(startPositions), }, - }, &UnwatchChannelsStep{ + }, &unwatchChannelsStep{ baseStep: baseStep{core: t.core}, collectionID: collID, channels: t.channels, }) - undoTask.AddStep(&ChangeCollectionStateStep{ + undoTask.AddStep(&changeCollectionStateStep{ baseStep: baseStep{core: t.core}, collectionID: collID, state: pb.CollectionState_CollectionCreated, ts: ts, - }, &NullStep{}) // We'll remove the whole collection anyway. + }, &nullStep{}) // We'll remove the whole collection anyway. return undoTask.Execute(ctx) } diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index 09540be79c..34996e7126 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -53,17 +53,17 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { State: pb.PartitionState_PartitionCreated, } - undoTask := newBaseUndoTask() - undoTask.AddStep(&ExpireCacheStep{ + undoTask := newBaseUndoTask(t.core.stepExecutor) + undoTask.AddStep(&expireCacheStep{ baseStep: baseStep{core: t.core}, collectionNames: []string{t.Req.GetCollectionName()}, collectionID: t.collMeta.CollectionID, ts: t.GetTs(), - }, &NullStep{}) - undoTask.AddStep(&AddPartitionMetaStep{ + }, &nullStep{}) + undoTask.AddStep(&addPartitionMetaStep{ baseStep: baseStep{core: t.core}, partition: partition, - }, &NullStep{}) // adding partition is atomic enough. + }, &nullStep{}) // adding partition is atomic enough. return undoTask.Execute(ctx) } diff --git a/internal/rootcoord/ddl_ts_lock_manager.go b/internal/rootcoord/ddl_ts_lock_manager.go new file mode 100644 index 0000000000..0e63ff5465 --- /dev/null +++ b/internal/rootcoord/ddl_ts_lock_manager.go @@ -0,0 +1,64 @@ +package rootcoord + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/tso" + + "go.uber.org/atomic" +) + +type DdlTsLockManagerV2 interface { + GetMinDdlTs() Timestamp + AddRefCnt(delta int32) + Lock() + Unlock() + UpdateLastTs(ts Timestamp) +} + +type ddlTsLockManagerV2 struct { + lastTs atomic.Uint64 + inProgressCnt atomic.Int32 + tsoAllocator tso.Allocator + mu sync.Mutex +} + +func (c *ddlTsLockManagerV2) GetMinDdlTs() Timestamp { + // In fact, `TryLock` can replace the `inProgressCnt` but it's not recommended. + if c.inProgressCnt.Load() > 0 { + return c.lastTs.Load() + } + c.Lock() + defer c.Unlock() + ts, err := c.tsoAllocator.GenerateTSO(1) + if err != nil { + return c.lastTs.Load() + } + c.UpdateLastTs(ts) + return ts +} + +func (c *ddlTsLockManagerV2) AddRefCnt(delta int32) { + c.inProgressCnt.Add(delta) +} + +func (c *ddlTsLockManagerV2) Lock() { + c.mu.Lock() +} + +func (c *ddlTsLockManagerV2) Unlock() { + c.mu.Unlock() +} + +func (c *ddlTsLockManagerV2) UpdateLastTs(ts Timestamp) { + c.lastTs.Store(ts) +} + +func newDdlTsLockManagerV2(tsoAllocator tso.Allocator) *ddlTsLockManagerV2 { + return &ddlTsLockManagerV2{ + lastTs: *atomic.NewUint64(0), + inProgressCnt: *atomic.NewInt32(0), + tsoAllocator: tsoAllocator, + mu: sync.Mutex{}, + } +} diff --git a/internal/rootcoord/ddl_ts_lock_manager_test.go b/internal/rootcoord/ddl_ts_lock_manager_test.go new file mode 100644 index 0000000000..a84d7e2542 --- /dev/null +++ b/internal/rootcoord/ddl_ts_lock_manager_test.go @@ -0,0 +1,43 @@ +package rootcoord + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_ddlTsLockManager_GetMinDdlTs(t *testing.T) { + t.Run("there are in-progress tasks", func(t *testing.T) { + m := newDdlTsLockManagerV2(nil) + m.UpdateLastTs(100) + m.inProgressCnt.Store(9999) + ts := m.GetMinDdlTs() + assert.Equal(t, Timestamp(100), ts) + }) + + t.Run("failed to generate ts", func(t *testing.T) { + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 0, errors.New("error mock GenerateTSO") + } + m := newDdlTsLockManagerV2(tsoAllocator) + m.UpdateLastTs(101) + m.inProgressCnt.Store(0) + ts := m.GetMinDdlTs() + assert.Equal(t, Timestamp(101), ts) + }) + + t.Run("normal case", func(t *testing.T) { + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 102, nil + } + m := newDdlTsLockManagerV2(tsoAllocator) + m.UpdateLastTs(101) + m.inProgressCnt.Store(0) + ts := m.GetMinDdlTs() + assert.Equal(t, Timestamp(102), ts) + assert.Equal(t, Timestamp(102), m.lastTs.Load()) + }) +} diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index c3c4dab7de..4954194458 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -52,39 +52,38 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { ts := t.GetTs() - redoTask := newBaseRedoTask() + redoTask := newBaseRedoTask(t.core.stepExecutor) - redoTask.AddSyncStep(&ExpireCacheStep{ + redoTask.AddSyncStep(&expireCacheStep{ baseStep: baseStep{core: t.core}, collectionNames: append(aliases, collMeta.Name), collectionID: collMeta.CollectionID, ts: ts, }) - redoTask.AddSyncStep(&ChangeCollectionStateStep{ + redoTask.AddSyncStep(&changeCollectionStateStep{ baseStep: baseStep{core: t.core}, collectionID: collMeta.CollectionID, state: pb.CollectionState_CollectionDropping, ts: ts, }) - redoTask.AddAsyncStep(&ReleaseCollectionStep{ + redoTask.AddAsyncStep(&releaseCollectionStep{ baseStep: baseStep{core: t.core}, collectionID: collMeta.CollectionID, }) - redoTask.AddAsyncStep(&DropIndexStep{ + redoTask.AddAsyncStep(&dropIndexStep{ baseStep: baseStep{core: t.core}, collID: collMeta.CollectionID, }) - redoTask.AddAsyncStep(&DeleteCollectionDataStep{ + redoTask.AddAsyncStep(&deleteCollectionDataStep{ baseStep: baseStep{core: t.core}, coll: collMeta, - ts: ts, }) - redoTask.AddAsyncStep(&RemoveDmlChannelsStep{ + redoTask.AddAsyncStep(&removeDmlChannelsStep{ baseStep: baseStep{core: t.core}, - pchannels: collMeta.PhysicalChannelNames, + pChannels: collMeta.PhysicalChannelNames, }) - redoTask.AddAsyncStep(&DeleteCollectionMetaStep{ + redoTask.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: t.core}, collectionID: collMeta.CollectionID, ts: ts, diff --git a/internal/rootcoord/drop_collection_task_test.go b/internal/rootcoord/drop_collection_task_test.go index 6ecb53a909..45c11fc8ce 100644 --- a/internal/rootcoord/drop_collection_task_test.go +++ b/internal/rootcoord/drop_collection_task_test.go @@ -5,8 +5,6 @@ import ( "errors" "testing" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/metastore/model" @@ -176,10 +174,10 @@ func Test_dropCollectionTask_Execute(t *testing.T) { gc := newMockGarbageCollector() deleteCollectionCalled := false deleteCollectionChan := make(chan struct{}, 1) - gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error { + gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection) (Timestamp, error) { deleteCollectionCalled = true deleteCollectionChan <- struct{}{} - return nil + return 0, nil } core := newTestCore( diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index 4d2d1c6c2d..b0342b88d5 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -53,14 +53,14 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { return nil } - redoTask := newBaseRedoTask() - redoTask.AddSyncStep(&ExpireCacheStep{ + redoTask := newBaseRedoTask(t.core.stepExecutor) + redoTask.AddSyncStep(&expireCacheStep{ baseStep: baseStep{core: t.core}, collectionNames: []string{t.Req.GetCollectionName()}, collectionID: t.collMeta.CollectionID, ts: t.GetTs(), }) - redoTask.AddSyncStep(&ChangePartitionStateStep{ + redoTask.AddSyncStep(&changePartitionStateStep{ baseStep: baseStep{core: t.core}, collectionID: t.collMeta.CollectionID, partitionID: partID, @@ -69,7 +69,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { }) // TODO: release partition when query coord is ready. - redoTask.AddAsyncStep(&DeletePartitionDataStep{ + redoTask.AddAsyncStep(&deletePartitionDataStep{ baseStep: baseStep{core: t.core}, pchans: t.collMeta.PhysicalChannelNames, partition: &model.Partition{ @@ -77,9 +77,8 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { PartitionName: t.Req.GetPartitionName(), CollectionID: t.collMeta.CollectionID, }, - ts: t.GetTs(), }) - redoTask.AddAsyncStep(&RemovePartitionMetaStep{ + redoTask.AddAsyncStep(&removePartitionMetaStep{ baseStep: baseStep{core: t.core}, collectionID: t.collMeta.CollectionID, partitionID: partID, diff --git a/internal/rootcoord/drop_partition_task_test.go b/internal/rootcoord/drop_partition_task_test.go index 3898b0c06d..6029a4f622 100644 --- a/internal/rootcoord/drop_partition_task_test.go +++ b/internal/rootcoord/drop_partition_task_test.go @@ -4,8 +4,6 @@ import ( "context" "testing" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/metastore/model" @@ -146,10 +144,10 @@ func Test_dropPartitionTask_Execute(t *testing.T) { gc := newMockGarbageCollector() deletePartitionCalled := false deletePartitionChan := make(chan struct{}, 1) - gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition, ts typeutil.Timestamp) error { + gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) { deletePartitionChan <- struct{}{} deletePartitionCalled = true - return nil + return 0, nil } core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc)) diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index e006a25791..b975c8ec00 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -2,101 +2,111 @@ package rootcoord import ( "context" - "time" "github.com/milvus-io/milvus/api/commonpb" ms "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/typeutil" - - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metastore/model" - "go.uber.org/zap" ) type GarbageCollector interface { ReDropCollection(collMeta *model.Collection, ts Timestamp) RemoveCreatingCollection(collMeta *model.Collection) ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp) - GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error - GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error + GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) + GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) } -type GarbageCollectorCtx struct { +type bgGarbageCollector struct { s *Core } -func newGarbageCollectorCtx(s *Core) *GarbageCollectorCtx { - return &GarbageCollectorCtx{s: s} +func newBgGarbageCollector(s *Core) *bgGarbageCollector { + return &bgGarbageCollector{s: s} } -func (c *GarbageCollectorCtx) ReDropCollection(collMeta *model.Collection, ts Timestamp) { +func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Timestamp) { // TODO: remove this after data gc can be notified by rpc. c.s.chanTimeTick.addDmlChannels(collMeta.PhysicalChannelNames...) - defer c.s.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() + redo := newBaseRedoTask(c.s.stepExecutor) + redo.AddAsyncStep(&releaseCollectionStep{ + baseStep: baseStep{core: c.s}, + collectionID: collMeta.CollectionID, + }) + redo.AddAsyncStep(&dropIndexStep{ + baseStep: baseStep{core: c.s}, + collID: collMeta.CollectionID, + }) + redo.AddAsyncStep(&deleteCollectionDataStep{ + baseStep: baseStep{core: c.s}, + coll: collMeta, + }) + redo.AddAsyncStep(&removeDmlChannelsStep{ + baseStep: baseStep{core: c.s}, + pChannels: collMeta.PhysicalChannelNames, + }) + redo.AddAsyncStep(&deleteCollectionMetaStep{ + baseStep: baseStep{core: c.s}, + collectionID: collMeta.CollectionID, + ts: ts, + }) - if err := c.s.broker.ReleaseCollection(ctx, collMeta.CollectionID); err != nil { - log.Error("failed to release collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID)) - return - } - - if err := c.s.broker.DropCollectionIndex(ctx, collMeta.CollectionID); err != nil { - log.Error("failed to drop collection index when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID)) - return - } - - if err := c.GcCollectionData(ctx, collMeta, ts); err != nil { - log.Error("failed to notify datacoord to gc collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID)) - return - } - - if err := c.s.meta.RemoveCollection(ctx, collMeta.CollectionID, ts); err != nil { - log.Error("failed to remove collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID)) - } + // err is ignored since no sync steps will be executed. + _ = redo.Execute(context.Background()) } -func (c *GarbageCollectorCtx) RemoveCreatingCollection(collMeta *model.Collection) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - if err := c.s.broker.UnwatchChannels(ctx, &watchInfo{collectionID: collMeta.CollectionID, vChannels: collMeta.VirtualChannelNames}); err != nil { - log.Error("failed to unwatch channels when recovery", - zap.Error(err), - zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID), - zap.Strings("vchans", collMeta.VirtualChannelNames), zap.Strings("pchans", collMeta.PhysicalChannelNames)) - return - } - - if err := c.s.meta.RemoveCollection(ctx, collMeta.CollectionID, collMeta.CreateTime); err != nil { - log.Error("failed to remove collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID)) - } +func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) { + redo := newBaseRedoTask(c.s.stepExecutor) + redo.AddAsyncStep(&unwatchChannelsStep{ + baseStep: baseStep{core: c.s}, + collectionID: collMeta.CollectionID, + channels: collectionChannels{ + virtualChannels: collMeta.VirtualChannelNames, + physicalChannels: collMeta.PhysicalChannelNames, + }, + }) + redo.AddAsyncStep(&deleteCollectionMetaStep{ + baseStep: baseStep{core: c.s}, + collectionID: collMeta.CollectionID, + ts: collMeta.CreateTime, + }) + // err is ignored since no sync steps will be executed. + _ = redo.Execute(context.Background()) } -func (c *GarbageCollectorCtx) ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - // TODO: release partition when query coord is ready. - +func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp) { // TODO: remove this after data gc can be notified by rpc. c.s.chanTimeTick.addDmlChannels(pChannels...) - defer c.s.chanTimeTick.removeDmlChannels(pChannels...) - if err := c.GcPartitionData(ctx, pChannels, partition, ts); err != nil { - log.Error("failed to notify datanodes to gc partition", zap.Error(err)) - return - } + redo := newBaseRedoTask(c.s.stepExecutor) + redo.AddAsyncStep(&deletePartitionDataStep{ + baseStep: baseStep{core: c.s}, + pchans: pChannels, + partition: partition, + }) + redo.AddAsyncStep(&removeDmlChannelsStep{ + baseStep: baseStep{core: c.s}, + pChannels: pChannels, + }) + redo.AddAsyncStep(&removePartitionMetaStep{ + baseStep: baseStep{core: c.s}, + collectionID: partition.CollectionID, + partitionID: partition.PartitionID, + ts: ts, + }) - if err := c.s.meta.RemovePartition(ctx, partition.CollectionID, partition.PartitionID, ts); err != nil { - log.Error("failed to remove partition when recovery", zap.Error(err)) - } + // err is ignored since no sync steps will be executed. + _ = redo.Execute(context.Background()) } -func (c *GarbageCollectorCtx) GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error { +func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) { + ts, err := c.s.tsoAllocator.GenerateTSO(1) + if err != nil { + return 0, err + } + msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -118,15 +128,18 @@ func (c *GarbageCollectorCtx) GcCollectionData(ctx context.Context, coll *model. } msgPack.Msgs = append(msgPack.Msgs, msg) if err := c.s.chanTimeTick.broadcastDmlChannels(coll.PhysicalChannelNames, &msgPack); err != nil { - return err + return 0, err } - // TODO: remove this after gc can be notified by rpc. Without this tt, DropCollectionMsg cannot be seen by - // datanodes. - return c.s.chanTimeTick.sendTimeTickToChannel(coll.PhysicalChannelNames, ts) + return ts, nil } -func (c *GarbageCollectorCtx) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error { +func (c *bgGarbageCollector) notifyPartitionGc(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) { + ts, err := c.s.tsoAllocator.GenerateTSO(1) + if err != nil { + return 0, err + } + msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -149,10 +162,36 @@ func (c *GarbageCollectorCtx) GcPartitionData(ctx context.Context, pChannels []s } msgPack.Msgs = append(msgPack.Msgs, msg) if err := c.s.chanTimeTick.broadcastDmlChannels(pChannels, &msgPack); err != nil { - return err + return 0, err } - // TODO: remove this after gc can be notified by rpc. Without this tt, DropCollectionMsg cannot be seen by - // datanodes. - return c.s.chanTimeTick.sendTimeTickToChannel(pChannels, ts) + return ts, nil +} + +func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) { + c.s.ddlTsLockManager.Lock() + c.s.ddlTsLockManager.AddRefCnt(1) + defer c.s.ddlTsLockManager.AddRefCnt(-1) + defer c.s.ddlTsLockManager.Unlock() + + ddlTs, err = c.notifyCollectionGc(ctx, coll) + if err != nil { + return 0, err + } + c.s.ddlTsLockManager.UpdateLastTs(ddlTs) + return ddlTs, nil +} + +func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) { + c.s.ddlTsLockManager.Lock() + c.s.ddlTsLockManager.AddRefCnt(1) + defer c.s.ddlTsLockManager.AddRefCnt(-1) + defer c.s.ddlTsLockManager.Unlock() + + ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition) + if err != nil { + return 0, err + } + c.s.ddlTsLockManager.UpdateLastTs(ddlTs) + return ddlTs, nil } diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index ad5ce9f5dc..c1d41f22d2 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -18,15 +18,17 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { } ticker := newTickerWithMockNormalStream() core := newTestCore(withBroker(broker), withTtSynchronizer(ticker)) - gc := newGarbageCollectorCtx(core) + gc := newBgGarbageCollector(core) gc.ReDropCollection(&model.Collection{}, 1000) }) t.Run("failed to DropCollectionIndex", func(t *testing.T) { broker := newMockBroker() releaseCollectionCalled := false + releaseCollectionChan := make(chan struct{}, 1) broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { releaseCollectionCalled = true + releaseCollectionChan <- struct{}{} return nil } broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error { @@ -34,43 +36,61 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { } ticker := newTickerWithMockNormalStream() core := newTestCore(withBroker(broker), withTtSynchronizer(ticker)) - gc := newGarbageCollectorCtx(core) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc gc.ReDropCollection(&model.Collection{}, 1000) + <-releaseCollectionChan assert.True(t, releaseCollectionCalled) }) t.Run("failed to GcCollectionData", func(t *testing.T) { broker := newMockBroker() releaseCollectionCalled := false + releaseCollectionChan := make(chan struct{}, 1) broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { releaseCollectionCalled = true + releaseCollectionChan <- struct{}{} return nil } dropCollectionIndexCalled := false + dropCollectionIndexChan := make(chan struct{}, 1) broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error { dropCollectionIndexCalled = true + dropCollectionIndexChan <- struct{}{} return nil } ticker := newTickerWithMockFailStream() // failed to broadcast drop msg. - core := newTestCore(withBroker(broker), withTtSynchronizer(ticker)) - gc := newGarbageCollectorCtx(core) + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } + core := newTestCore(withBroker(broker), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator)) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc shardsNum := 2 pchans := ticker.getDmlChannelNames(shardsNum) gc.ReDropCollection(&model.Collection{PhysicalChannelNames: pchans}, 1000) + <-releaseCollectionChan assert.True(t, releaseCollectionCalled) + <-dropCollectionIndexChan assert.True(t, dropCollectionIndexCalled) }) t.Run("failed to remove collection", func(t *testing.T) { broker := newMockBroker() releaseCollectionCalled := false + releaseCollectionChan := make(chan struct{}, 1) broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { releaseCollectionCalled = true + releaseCollectionChan <- struct{}{} return nil } dropCollectionIndexCalled := false + dropCollectionIndexChan := make(chan struct{}, 1) broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error { dropCollectionIndexCalled = true + dropCollectionIndexChan <- struct{}{} return nil } meta := newMockMetaTable() @@ -78,41 +98,66 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { return errors.New("error mock RemoveCollection") } ticker := newTickerWithMockNormalStream() + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } core := newTestCore(withBroker(broker), withTtSynchronizer(ticker), + withTsoAllocator(tsoAllocator), withMeta(meta)) - gc := newGarbageCollectorCtx(core) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc gc.ReDropCollection(&model.Collection{}, 1000) + <-releaseCollectionChan assert.True(t, releaseCollectionCalled) + <-dropCollectionIndexChan assert.True(t, dropCollectionIndexCalled) }) t.Run("normal case", func(t *testing.T) { broker := newMockBroker() releaseCollectionCalled := false + releaseCollectionChan := make(chan struct{}, 1) broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { releaseCollectionCalled = true + releaseCollectionChan <- struct{}{} return nil } dropCollectionIndexCalled := false + dropCollectionIndexChan := make(chan struct{}, 1) broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error { dropCollectionIndexCalled = true + dropCollectionIndexChan <- struct{}{} return nil } meta := newMockMetaTable() removeCollectionCalled := false + removeCollectionChan := make(chan struct{}, 1) meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error { removeCollectionCalled = true + removeCollectionChan <- struct{}{} return nil } ticker := newTickerWithMockNormalStream() + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } core := newTestCore(withBroker(broker), withTtSynchronizer(ticker), + withTsoAllocator(tsoAllocator), withMeta(meta)) - gc := newGarbageCollectorCtx(core) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc gc.ReDropCollection(&model.Collection{}, 1000) + <-releaseCollectionChan assert.True(t, releaseCollectionCalled) + <-dropCollectionIndexChan assert.True(t, dropCollectionIndexCalled) + <-removeCollectionChan assert.True(t, removeCollectionCalled) }) } @@ -124,15 +169,18 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { return errors.New("error mock UnwatchChannels") } core := newTestCore(withBroker(broker)) - gc := newGarbageCollectorCtx(core) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc gc.RemoveCreatingCollection(&model.Collection{}) }) t.Run("failed to RemoveCollection", func(t *testing.T) { broker := newMockBroker() unwatchChannelsCalled := false + unwatchChannelsChan := make(chan struct{}, 1) broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { unwatchChannelsCalled = true + unwatchChannelsChan <- struct{}{} return nil } meta := newMockMetaTable() @@ -140,69 +188,96 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { return errors.New("error mock RemoveCollection") } core := newTestCore(withBroker(broker), withMeta(meta)) - gc := newGarbageCollectorCtx(core) + gc := newBgGarbageCollector(core) gc.RemoveCreatingCollection(&model.Collection{}) + <-unwatchChannelsChan assert.True(t, unwatchChannelsCalled) }) t.Run("normal case", func(t *testing.T) { broker := newMockBroker() unwatchChannelsCalled := false + unwatchChannelsChan := make(chan struct{}, 1) broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { unwatchChannelsCalled = true + unwatchChannelsChan <- struct{}{} return nil } meta := newMockMetaTable() removeCollectionCalled := false + removeCollectionChan := make(chan struct{}, 1) meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error { removeCollectionCalled = true + removeCollectionChan <- struct{}{} return nil } core := newTestCore(withBroker(broker), withMeta(meta)) - gc := newGarbageCollectorCtx(core) + gc := newBgGarbageCollector(core) gc.RemoveCreatingCollection(&model.Collection{}) + <-unwatchChannelsChan assert.True(t, unwatchChannelsCalled) + <-removeCollectionChan assert.True(t, removeCollectionCalled) }) } -// func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { -// t.Run("failed to GcPartitionData", func(t *testing.T) { -// ticker := newTickerWithMockFailStream() // failed to broadcast drop msg. -// shardsNum := 2 -// pchans := ticker.getDmlChannelNames(shardsNum) -// core := newTestCore(withTtSynchronizer(ticker)) -// gc := newGarbageCollectorCtx(core) -// gc.ReDropPartition(pchans, &model.Partition{}, 100000) -// }) -// -// t.Run("failed to RemovePartition", func(t *testing.T) { -// ticker := newTickerWithMockNormalStream() -// shardsNum := 2 -// pchans := ticker.getDmlChannelNames(shardsNum) -// meta := newMockMetaTable() -// meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { -// return errors.New("error mock RemovePartition") -// } -// core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) -// gc := newGarbageCollectorCtx(core) -// gc.ReDropPartition(pchans, &model.Partition{}, 100000) -// }) -// -// t.Run("normal case", func(t *testing.T) { -// ticker := newTickerWithMockNormalStream() -// shardsNum := 2 -// pchans := ticker.getDmlChannelNames(shardsNum) -// meta := newMockMetaTable() -// removePartitionCalled := false -// meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { -// removePartitionCalled = true -// return nil -// } -// core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) -// gc := newGarbageCollectorCtx(core) -// gc.ReDropPartition(pchans, &model.Partition{}, 100000) -// assert.True(t, removePartitionCalled) -// }) -// } -// +func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { + t.Run("failed to GcPartitionData", func(t *testing.T) { + ticker := newTickerWithMockFailStream() // failed to broadcast drop msg. + shardsNum := 2 + pchans := ticker.getDmlChannelNames(shardsNum) + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } + core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator)) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc + gc.ReDropPartition(pchans, &model.Partition{}, 100000) + }) + + t.Run("failed to RemovePartition", func(t *testing.T) { + ticker := newTickerWithMockNormalStream() + shardsNum := 2 + pchans := ticker.getDmlChannelNames(shardsNum) + meta := newMockMetaTable() + meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { + return errors.New("error mock RemovePartition") + } + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator)) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc + gc.ReDropPartition(pchans, &model.Partition{}, 100000) + }) + + t.Run("normal case", func(t *testing.T) { + ticker := newTickerWithMockNormalStream() + shardsNum := 2 + pchans := ticker.getDmlChannelNames(shardsNum) + meta := newMockMetaTable() + removePartitionCalled := false + removePartitionChan := make(chan struct{}, 1) + meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { + removePartitionCalled = true + removePartitionChan <- struct{}{} + return nil + } + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator)) + core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator) + gc := newBgGarbageCollector(core) + core.garbageCollector = gc + gc.ReDropPartition(pchans, &model.Partition{}, 100000) + <-removePartitionChan + assert.True(t, removePartitionCalled) + }) +} diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index ae6333dbcb..443e6ec34d 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -259,6 +259,12 @@ func newTestCore(opts ...Opt) *Core { c := &Core{ session: &sessionutil.Session{ServerID: TestRootCoordID}, } + executor := newMockStepExecutor() + executor.AddStepsFunc = func(s *stepStack) { + // no schedule, execute directly. + s.Execute(context.Background()) + } + c.stepExecutor = executor for _, opt := range opts { opt(c) } @@ -647,7 +653,9 @@ func withAbnormalCode() Opt { type mockScheduler struct { IScheduler - AddTaskFunc func(t taskV2) error + AddTaskFunc func(t taskV2) error + GetMinDdlTsFunc func() Timestamp + minDdlTs Timestamp } func newMockScheduler() *mockScheduler { @@ -661,6 +669,13 @@ func (m mockScheduler) AddTask(t taskV2) error { return nil } +func (m mockScheduler) GetMinDdlTs() Timestamp { + if m.GetMinDdlTsFunc != nil { + return m.GetMinDdlTsFunc() + } + return m.minDdlTs +} + func withScheduler(sched IScheduler) Opt { return func(c *Core) { c.scheduler = sched @@ -759,16 +774,16 @@ func withBroker(b Broker) Opt { type mockGarbageCollector struct { GarbageCollector - GcCollectionDataFunc func(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error - GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error + GcCollectionDataFunc func(ctx context.Context, coll *model.Collection) (Timestamp, error) + GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) } -func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error { - return m.GcCollectionDataFunc(ctx, coll, ts) +func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (Timestamp, error) { + return m.GcCollectionDataFunc(ctx, coll) } -func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error { - return m.GcPartitionDataFunc(ctx, pChannels, partition, ts) +func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) { + return m.GcPartitionDataFunc(ctx, pChannels, partition) } func newMockGarbageCollector() *mockGarbageCollector { @@ -809,6 +824,9 @@ func newTickerWithMockFailStream() *timetickSync { func newMockNormalStream() *msgstream.MockMsgStream { stream := msgstream.NewMockMsgStream() + stream.BroadcastFunc = func(pack *msgstream.MsgPack) error { + return nil + } stream.BroadcastMarkFunc = func(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { return map[string][]msgstream.MessageID{}, nil } @@ -838,3 +856,63 @@ func newTickerWithFactory(factory msgstream.Factory) *timetickSync { ticker := newTimeTickSync(ctx, TestRootCoordID, factory, chans) return ticker } + +type mockDdlTsLockManager struct { + DdlTsLockManagerV2 + GetMinDdlTsFunc func() Timestamp +} + +func (m mockDdlTsLockManager) GetMinDdlTs() Timestamp { + if m.GetMinDdlTsFunc != nil { + return m.GetMinDdlTsFunc() + } + return 100 +} + +func newMockDdlTsLockManager() *mockDdlTsLockManager { + return &mockDdlTsLockManager{} +} + +func withDdlTsLockManager(m DdlTsLockManagerV2) Opt { + return func(c *Core) { + c.ddlTsLockManager = m + } +} + +type mockStepExecutor struct { + StepExecutor + StartFunc func() + StopFunc func() + AddStepsFunc func(s *stepStack) +} + +func newMockStepExecutor() *mockStepExecutor { + return &mockStepExecutor{} +} + +func (m mockStepExecutor) Start() { + if m.StartFunc != nil { + m.StartFunc() + } else { + } +} + +func (m mockStepExecutor) Stop() { + if m.StopFunc != nil { + m.StopFunc() + } else { + } +} + +func (m mockStepExecutor) AddSteps(s *stepStack) { + if m.AddStepsFunc != nil { + m.AddStepsFunc(s) + } else { + } +} + +func withStepExecutor(executor StepExecutor) Opt { + return func(c *Core) { + c.stepExecutor = executor + } +} diff --git a/internal/rootcoord/redo.go b/internal/rootcoord/redo.go index a96f2d1db7..d5cdd27f6d 100644 --- a/internal/rootcoord/redo.go +++ b/internal/rootcoord/redo.go @@ -2,53 +2,49 @@ package rootcoord import ( "context" - "time" "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" ) type baseRedoTask struct { - syncTodoStep []Step // steps to execute synchronously - asyncTodoStep []Step // steps to execute asynchronously + syncTodoStep []nestedStep // steps to execute synchronously + asyncTodoStep []nestedStep // steps to execute asynchronously + stepExecutor StepExecutor } -func newBaseRedoTask() *baseRedoTask { +func newBaseRedoTask(stepExecutor StepExecutor) *baseRedoTask { return &baseRedoTask{ - syncTodoStep: make([]Step, 0), - asyncTodoStep: make([]Step, 0), + syncTodoStep: make([]nestedStep, 0), + asyncTodoStep: make([]nestedStep, 0), + stepExecutor: stepExecutor, } } -func (b *baseRedoTask) AddSyncStep(step Step) { +func (b *baseRedoTask) AddSyncStep(step nestedStep) { b.syncTodoStep = append(b.syncTodoStep, step) } -func (b *baseRedoTask) AddAsyncStep(step Step) { +func (b *baseRedoTask) AddAsyncStep(step nestedStep) { b.asyncTodoStep = append(b.asyncTodoStep, step) } func (b *baseRedoTask) redoAsyncSteps() { - // You cannot just use the ctx of task, since it will be canceled after response is returned. - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - for i := 0; i < len(b.asyncTodoStep); i++ { - todo := b.asyncTodoStep[i] - if err := todo.Execute(ctx); err != nil { - // You depend on the collection meta to do other gc. - // TODO: add ddl logger after other service can be idempotent enough, then you can do separate steps - // independently. - log.Error("failed to execute step, garbage may be generated", zap.Error(err)) - return - } + l := len(b.asyncTodoStep) + steps := make([]nestedStep, 0, l) + for i := l - 1; i >= 0; i-- { + steps = append(steps, b.asyncTodoStep[i]) } + b.asyncTodoStep = nil // make baseRedoTask can be collected. + b.stepExecutor.AddSteps(&stepStack{steps: steps}) } func (b *baseRedoTask) Execute(ctx context.Context) error { for i := 0; i < len(b.syncTodoStep); i++ { todo := b.syncTodoStep[i] - if err := todo.Execute(ctx); err != nil { - log.Error("failed to execute step", zap.Error(err)) + // no children step in sync steps. + if _, err := todo.Execute(ctx); err != nil { + log.Error("failed to execute step", zap.Error(err), zap.String("desc", todo.Desc())) return err } } diff --git a/internal/rootcoord/redo_test.go b/internal/rootcoord/redo_test.go index c78bbaadac..2051951ed3 100644 --- a/internal/rootcoord/redo_test.go +++ b/internal/rootcoord/redo_test.go @@ -9,21 +9,27 @@ import ( ) type mockFailStep struct { + baseStep calledChan chan struct{} called bool + err error } func newMockFailStep() *mockFailStep { return &mockFailStep{calledChan: make(chan struct{}, 1), called: false} } -func (m *mockFailStep) Execute(ctx context.Context) error { +func (m *mockFailStep) Execute(ctx context.Context) ([]nestedStep, error) { m.called = true m.calledChan <- struct{}{} - return errors.New("error mock Execute") + if m.err != nil { + return nil, m.err + } + return nil, errors.New("error mock Execute") } type mockNormalStep struct { + nestedStep calledChan chan struct{} called bool } @@ -32,16 +38,26 @@ func newMockNormalStep() *mockNormalStep { return &mockNormalStep{calledChan: make(chan struct{}, 1), called: false} } -func (m *mockNormalStep) Execute(ctx context.Context) error { +func (m *mockNormalStep) Execute(ctx context.Context) ([]nestedStep, error) { m.called = true m.calledChan <- struct{}{} - return nil + return nil, nil +} + +func newTestRedoTask() *baseRedoTask { + stepExecutor := newMockStepExecutor() + stepExecutor.AddStepsFunc = func(s *stepStack) { + // no schedule, execute directly. + s.Execute(context.Background()) + } + redo := newBaseRedoTask(stepExecutor) + return redo } func Test_baseRedoTask_redoAsyncSteps(t *testing.T) { t.Run("partial error", func(t *testing.T) { - redo := newBaseRedoTask() - steps := []Step{newMockNormalStep(), newMockFailStep(), newMockNormalStep()} + redo := newTestRedoTask() + steps := []nestedStep{newMockNormalStep(), newMockFailStep(), newMockNormalStep()} for _, step := range steps { redo.AddAsyncStep(step) } @@ -51,9 +67,9 @@ func Test_baseRedoTask_redoAsyncSteps(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - redo := newBaseRedoTask() + redo := newTestRedoTask() n := 10 - steps := make([]Step, 0, n) + steps := make([]nestedStep, 0, n) for i := 0; i < n; i++ { steps = append(steps, newMockNormalStep()) } @@ -69,10 +85,10 @@ func Test_baseRedoTask_redoAsyncSteps(t *testing.T) { func Test_baseRedoTask_Execute(t *testing.T) { t.Run("sync not finished, no async task", func(t *testing.T) { - redo := newBaseRedoTask() - syncSteps := []Step{newMockFailStep()} + redo := newTestRedoTask() + syncSteps := []nestedStep{newMockFailStep()} asyncNum := 10 - asyncSteps := make([]Step, 0, asyncNum) + asyncSteps := make([]nestedStep, 0, asyncNum) for i := 0; i < asyncNum; i++ { asyncSteps = append(asyncSteps, newMockNormalStep()) } @@ -92,11 +108,11 @@ func Test_baseRedoTask_Execute(t *testing.T) { // TODO: sync finished, but some async fail. t.Run("normal case", func(t *testing.T) { - redo := newBaseRedoTask() + redo := newTestRedoTask() syncNum := 10 - syncSteps := make([]Step, 0, syncNum) + syncSteps := make([]nestedStep, 0, syncNum) asyncNum := 10 - asyncSteps := make([]Step, 0, asyncNum) + asyncSteps := make([]nestedStep, 0, asyncNum) for i := 0; i < syncNum; i++ { syncSteps = append(syncSteps, newMockNormalStep()) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ae30f2444f..b574e5f3ea 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -99,7 +99,9 @@ type Core struct { meta IMetaTable scheduler IScheduler broker Broker + ddlTsLockManager DdlTsLockManagerV2 garbageCollector GarbageCollector + stepExecutor StepExecutor metaKVCreator metaKVCreator @@ -175,6 +177,14 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error { return c.chanTimeTick.updateTimeTick(&ttMsg, reason) } +func (c *Core) sendMinDdlTsAsTt() { + minDdlTs := c.ddlTsLockManager.GetMinDdlTs() + err := c.sendTimeTick(minDdlTs, "timetick loop") + if err != nil { + log.Warn("failed to send timetick", zap.Error(err)) + } +} + func (c *Core) startTimeTickLoop() { defer c.wg.Done() ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval) @@ -183,12 +193,7 @@ func (c *Core) startTimeTickLoop() { case <-c.ctx.Done(): return case <-ticker.C: - if ts, err := c.tsoAllocator.GenerateTSO(1); err == nil { - err := c.sendTimeTick(ts, "timetick loop") - if err != nil { - log.Warn("failed to send timetick", zap.Error(err)) - } - } + c.sendMinDdlTsAsTt() } } } @@ -441,7 +446,9 @@ func (c *Core) initInternal() error { c.proxyClientManager = newProxyClientManager(c.proxyCreator) c.broker = newServerBroker(c) - c.garbageCollector = newGarbageCollectorCtx(c) + c.ddlTsLockManager = newDdlTsLockManagerV2(c.tsoAllocator) + c.garbageCollector = newBgGarbageCollector(c) + c.stepExecutor = newBgStepExecutor(c.ctx) c.proxyManager = newProxyManager( c.ctx, @@ -615,6 +622,7 @@ func (c *Core) startInternal() error { } c.scheduler.Start() + c.stepExecutor.Start() Params.RootCoordCfg.CreatedTime = time.Now() Params.RootCoordCfg.UpdatedTime = time.Now() @@ -635,6 +643,9 @@ func (c *Core) Start() error { func (c *Core) Stop() error { c.UpdateStateCode(internalpb.StateCode_Abnormal) + c.stepExecutor.Stop() + c.scheduler.Stop() + c.cancel() c.wg.Wait() // wait at most one second to revoke diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 9dc89cf041..6891def987 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -4,6 +4,7 @@ import ( "context" "math/rand" "testing" + "time" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -858,3 +859,38 @@ func TestCore_Rbac(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) } } + +func TestCore_sendMinDdlTsAsTt(t *testing.T) { + ticker := newRocksMqTtSynchronizer() + ddlManager := newMockDdlTsLockManager() + ddlManager.GetMinDdlTsFunc = func() Timestamp { + return 100 + } + c := newTestCore( + withTtSynchronizer(ticker), + withDdlTsLockManager(ddlManager)) + c.sendMinDdlTsAsTt() // no session. + ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID}) + c.sendMinDdlTsAsTt() +} + +func TestCore_startTimeTickLoop(t *testing.T) { + ticker := newRocksMqTtSynchronizer() + ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID}) + ddlManager := newMockDdlTsLockManager() + ddlManager.GetMinDdlTsFunc = func() Timestamp { + return 100 + } + c := newTestCore( + withTtSynchronizer(ticker), + withDdlTsLockManager(ddlManager)) + ctx, cancel := context.WithCancel(context.Background()) + c.ctx = ctx + Params.ProxyCfg.TimeTickInterval = time.Millisecond + c.wg.Add(1) + go c.startTimeTickLoop() + + time.Sleep(time.Millisecond * 4) + cancel() + c.wg.Wait() +} diff --git a/internal/rootcoord/scheduler.go b/internal/rootcoord/scheduler.go index bffd2bcc5e..9bbceff446 100644 --- a/internal/rootcoord/scheduler.go +++ b/internal/rootcoord/scheduler.go @@ -24,6 +24,8 @@ type scheduler struct { tsoAllocator tso.Allocator taskChan chan taskV2 + + lock sync.Mutex } func newScheduler(ctx context.Context, idAllocator allocator.GIDAllocator, tsoAllocator tso.Allocator) *scheduler { @@ -49,6 +51,15 @@ func (s *scheduler) Stop() { s.wg.Wait() } +func (s *scheduler) execute(task taskV2) { + if err := task.Prepare(task.GetCtx()); err != nil { + task.NotifyDone(err) + return + } + err := task.Execute(task.GetCtx()) + task.NotifyDone(err) +} + func (s *scheduler) taskLoop() { defer s.wg.Done() for { @@ -56,12 +67,7 @@ func (s *scheduler) taskLoop() { case <-s.ctx.Done(): return case task := <-s.taskChan: - if err := task.Prepare(task.GetCtx()); err != nil { - task.NotifyDone(err) - continue - } - err := task.Execute(task.GetCtx()) - task.NotifyDone(err) + s.execute(task) } } } @@ -89,6 +95,10 @@ func (s *scheduler) enqueue(task taskV2) { } func (s *scheduler) AddTask(task taskV2) error { + // make sure that setting ts and enqueue is atomic. + s.lock.Lock() + defer s.lock.Unlock() + if err := s.setID(task); err != nil { return err } diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index dec64303ce..6de0e689d0 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -3,8 +3,12 @@ package rootcoord import ( "context" "errors" + "fmt" "math/rand" "testing" + "time" + + "go.uber.org/atomic" "github.com/stretchr/testify/assert" ) @@ -103,7 +107,7 @@ func Test_scheduler_failed_to_set_ts(t *testing.T) { assert.Error(t, err) } -func Test_scheduler_enqueu_normal_case(t *testing.T) { +func Test_scheduler_enqueue_normal_case(t *testing.T) { idAlloc := newMockIDAllocator() tsoAlloc := newMockTsoAllocator() idAlloc.AllocOneF = func() (UniqueID, error) { @@ -166,3 +170,55 @@ func Test_scheduler_bg(t *testing.T) { s.Stop() } + +func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { + t.Run("normal case", func(t *testing.T) { + idAlloc := newMockIDAllocator() + tsoAlloc := newMockTsoAllocator() + tso := atomic.NewUint64(100) + idAlloc.AllocOneF = func() (UniqueID, error) { + return 100, nil + } + tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) { + got := tso.Inc() + return got, nil + } + ctx := context.Background() + s := newScheduler(ctx, idAlloc, tsoAlloc) + Params.InitOnce() + Params.ProxyCfg.TimeTickInterval = time.Millisecond + s.Start() + + time.Sleep(time.Millisecond * 4) + + // add task to queue. + n := 10 + for i := 0; i < n; i++ { + task := newMockNormalTask() + err := s.AddTask(task) + assert.NoError(t, err) + } + + time.Sleep(time.Millisecond * 4) + s.Stop() + }) + + t.Run("invalid tso", func(t *testing.T) { + idAlloc := newMockIDAllocator() + tsoAlloc := newMockTsoAllocator() + idAlloc.AllocOneF = func() (UniqueID, error) { + return 100, nil + } + tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) { + return 0, fmt.Errorf("error mock GenerateTSO") + } + ctx := context.Background() + s := newScheduler(ctx, idAlloc, tsoAlloc) + Params.InitOnce() + Params.ProxyCfg.TimeTickInterval = time.Millisecond + s.Start() + + time.Sleep(time.Millisecond * 4) + s.Stop() + }) +} diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index ec8f842f02..07ada29aae 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -2,139 +2,285 @@ package rootcoord import ( "context" + "fmt" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/metastore/model" ) -type Step interface { - Execute(ctx context.Context) error +type stepPriority int + +const ( + stepPriorityLow = iota + stepPriorityNormal + stepPriorityImportant + stepPriorityUrgent +) + +type nestedStep interface { + Execute(ctx context.Context) ([]nestedStep, error) + Desc() string + Weight() stepPriority } type baseStep struct { core *Core } -type AddCollectionMetaStep struct { +func (s baseStep) Desc() string { + return "" +} + +func (s baseStep) Weight() stepPriority { + return stepPriorityLow +} + +type addCollectionMetaStep struct { baseStep coll *model.Collection } -func (s *AddCollectionMetaStep) Execute(ctx context.Context) error { - return s.core.meta.AddCollection(ctx, s.coll) +func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.AddCollection(ctx, s.coll) + return nil, err } -type DeleteCollectionMetaStep struct { +func (s *addCollectionMetaStep) Desc() string { + return fmt.Sprintf("add collection to meta table, name: %s, id: %d, ts: %d", s.coll.Name, s.coll.CollectionID, s.coll.CreateTime) +} + +type deleteCollectionMetaStep struct { baseStep collectionID UniqueID ts Timestamp } -func (s *DeleteCollectionMetaStep) Execute(ctx context.Context) error { - return s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts) +func (s *deleteCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts) + return nil, err } -type RemoveDmlChannelsStep struct { +func (s *deleteCollectionMetaStep) Desc() string { + return fmt.Sprintf("delete collection from meta table, id: %d, ts: %d", s.collectionID, s.ts) +} + +func (s *deleteCollectionMetaStep) Weight() stepPriority { + return stepPriorityNormal +} + +type removeDmlChannelsStep struct { baseStep - pchannels []string + pChannels []string } -func (s *RemoveDmlChannelsStep) Execute(ctx context.Context) error { - s.core.chanTimeTick.removeDmlChannels(s.pchannels...) - return nil +func (s *removeDmlChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { + s.core.chanTimeTick.removeDmlChannels(s.pChannels...) + return nil, nil } -type WatchChannelsStep struct { +func (s *removeDmlChannelsStep) Desc() string { + // this shouldn't be called. + return fmt.Sprintf("remove dml channels: %v", s.pChannels) +} + +func (s *removeDmlChannelsStep) Weight() stepPriority { + // avoid too frequent tt. + return stepPriorityUrgent +} + +type watchChannelsStep struct { baseStep info *watchInfo } -func (s *WatchChannelsStep) Execute(ctx context.Context) error { - return s.core.broker.WatchChannels(ctx, s.info) +func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.broker.WatchChannels(ctx, s.info) + return nil, err } -type UnwatchChannelsStep struct { +func (s *watchChannelsStep) Desc() string { + return fmt.Sprintf("watch channels, ts: %d, collection: %d, partition: %d, vChannels: %v", + s.info.ts, s.info.collectionID, s.info.partitionID, s.info.vChannels) +} + +type unwatchChannelsStep struct { baseStep collectionID UniqueID channels collectionChannels } -func (s *UnwatchChannelsStep) Execute(ctx context.Context) error { - return s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels}) +func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels}) + return nil, err } -type ChangeCollectionStateStep struct { +func (s *unwatchChannelsStep) Desc() string { + return fmt.Sprintf("unwatch channels, collection: %d, pChannels: %v, vChannels: %v", + s.collectionID, s.channels.physicalChannels, s.channels.virtualChannels) +} + +func (s *unwatchChannelsStep) Weight() stepPriority { + return stepPriorityNormal +} + +type changeCollectionStateStep struct { baseStep collectionID UniqueID state pb.CollectionState ts Timestamp } -func (s *ChangeCollectionStateStep) Execute(ctx context.Context) error { - return s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts) +func (s *changeCollectionStateStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts) + return nil, err } -type ExpireCacheStep struct { +func (s *changeCollectionStateStep) Desc() string { + return fmt.Sprintf("change collection state, collection: %d, ts: %d, state: %s", + s.collectionID, s.ts, s.state.String()) +} + +type expireCacheStep struct { baseStep collectionNames []string collectionID UniqueID ts Timestamp } -func (s *ExpireCacheStep) Execute(ctx context.Context) error { - return s.core.ExpireMetaCache(ctx, s.collectionNames, s.collectionID, s.ts) +func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.ExpireMetaCache(ctx, s.collectionNames, s.collectionID, s.ts) + return nil, err } -type DeleteCollectionDataStep struct { +func (s *expireCacheStep) Desc() string { + return fmt.Sprintf("expire cache, collection id: %d, collection names: %s, ts: %d", + s.collectionID, s.collectionNames, s.ts) +} + +type deleteCollectionDataStep struct { baseStep coll *model.Collection - ts Timestamp } -func (s *DeleteCollectionDataStep) Execute(ctx context.Context) error { - return s.core.garbageCollector.GcCollectionData(ctx, s.coll, s.ts) +func (s *deleteCollectionDataStep) Execute(ctx context.Context) ([]nestedStep, error) { + ddlTs, err := s.core.garbageCollector.GcCollectionData(ctx, s.coll) + if err != nil { + return nil, err + } + // wait for ts synced. + children := make([]nestedStep, 0, len(s.coll.PhysicalChannelNames)) + for _, channel := range s.coll.PhysicalChannelNames { + children = append(children, &waitForTsSyncedStep{ + baseStep: baseStep{core: s.core}, + ts: ddlTs, + channel: channel, + }) + } + return children, nil } -type DeletePartitionDataStep struct { +func (s *deleteCollectionDataStep) Desc() string { + return fmt.Sprintf("delete collection data, collection: %d", s.coll.CollectionID) +} + +func (s *deleteCollectionDataStep) Weight() stepPriority { + return stepPriorityImportant +} + +// waitForTsSyncedStep child step of deleteCollectionDataStep. +type waitForTsSyncedStep struct { + baseStep + ts Timestamp + channel string +} + +func (s *waitForTsSyncedStep) Execute(ctx context.Context) ([]nestedStep, error) { + syncedTs := s.core.chanTimeTick.getSyncedTimeTick(s.channel) + if syncedTs < s.ts { + return nil, fmt.Errorf("ts not synced yet, channel: %s, synced: %d, want: %d", s.channel, syncedTs, s.ts) + } + return nil, nil +} + +func (s *waitForTsSyncedStep) Desc() string { + return fmt.Sprintf("wait for ts synced, channel: %s, want: %d", s.channel, s.ts) +} + +func (s *waitForTsSyncedStep) Weight() stepPriority { + return stepPriorityNormal +} + +type deletePartitionDataStep struct { baseStep pchans []string partition *model.Partition - ts Timestamp } -func (s *DeletePartitionDataStep) Execute(ctx context.Context) error { - return s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition, s.ts) +func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, error) { + _, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition) + return nil, err } -type ReleaseCollectionStep struct { +func (s *deletePartitionDataStep) Desc() string { + return fmt.Sprintf("delete partition data, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID) +} + +func (s *deletePartitionDataStep) Weight() stepPriority { + return stepPriorityImportant +} + +type releaseCollectionStep struct { baseStep collectionID UniqueID } -func (s *ReleaseCollectionStep) Execute(ctx context.Context) error { - return s.core.broker.ReleaseCollection(ctx, s.collectionID) +func (s *releaseCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.broker.ReleaseCollection(ctx, s.collectionID) + return nil, err } -type DropIndexStep struct { +func (s *releaseCollectionStep) Desc() string { + return fmt.Sprintf("release collection: %d", s.collectionID) +} + +func (s *releaseCollectionStep) Weight() stepPriority { + return stepPriorityUrgent +} + +type dropIndexStep struct { baseStep collID UniqueID } -func (s *DropIndexStep) Execute(ctx context.Context) error { - return s.core.broker.DropCollectionIndex(ctx, s.collID) +func (s *dropIndexStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.broker.DropCollectionIndex(ctx, s.collID) + return nil, err } -type AddPartitionMetaStep struct { +func (s *dropIndexStep) Desc() string { + return fmt.Sprintf("drop collection index: %d", s.collID) +} + +func (s *dropIndexStep) Weight() stepPriority { + return stepPriorityNormal +} + +type addPartitionMetaStep struct { baseStep partition *model.Partition } -func (s *AddPartitionMetaStep) Execute(ctx context.Context) error { - return s.core.meta.AddPartition(ctx, s.partition) +func (s *addPartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.AddPartition(ctx, s.partition) + return nil, err } -type ChangePartitionStateStep struct { +func (s *addPartitionMetaStep) Desc() string { + return fmt.Sprintf("add partition to meta table, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID) +} + +type changePartitionStateStep struct { baseStep collectionID UniqueID partitionID UniqueID @@ -142,24 +288,47 @@ type ChangePartitionStateStep struct { ts Timestamp } -func (s *ChangePartitionStateStep) Execute(ctx context.Context) error { - return s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts) +func (s *changePartitionStateStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts) + return nil, err } -type RemovePartitionMetaStep struct { +func (s *changePartitionStateStep) Desc() string { + return fmt.Sprintf("change partition step, collection: %d, partition: %d, state: %s, ts: %d", + s.collectionID, s.partitionID, s.state.String(), s.ts) +} + +type removePartitionMetaStep struct { baseStep collectionID UniqueID partitionID UniqueID ts Timestamp } -func (s *RemovePartitionMetaStep) Execute(ctx context.Context) error { - return s.core.meta.RemovePartition(ctx, s.collectionID, s.partitionID, s.ts) +func (s *removePartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.RemovePartition(ctx, s.collectionID, s.partitionID, s.ts) + return nil, err } -type NullStep struct { +func (s *removePartitionMetaStep) Desc() string { + return fmt.Sprintf("remove partition meta, collection: %d, partition: %d, ts: %d", s.collectionID, s.partitionID, s.ts) } -func (s *NullStep) Execute(ctx context.Context) error { - return nil +func (s *removePartitionMetaStep) Weight() stepPriority { + return stepPriorityNormal +} + +type nullStep struct { +} + +func (s *nullStep) Execute(ctx context.Context) ([]nestedStep, error) { + return nil, nil +} + +func (s *nullStep) Desc() string { + return "" +} + +func (s *nullStep) Weight() stepPriority { + return stepPriorityLow } diff --git a/internal/rootcoord/step_executor.go b/internal/rootcoord/step_executor.go new file mode 100644 index 0000000000..611000ee15 --- /dev/null +++ b/internal/rootcoord/step_executor.go @@ -0,0 +1,195 @@ +package rootcoord + +import ( + "context" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/retry" + "go.uber.org/zap" +) + +const ( + defaultBgExecutingParallel = 4 + defaultBgExecutingInterval = time.Second +) + +type StepExecutor interface { + Start() + Stop() + AddSteps(s *stepStack) +} + +type stepStack struct { + steps []nestedStep +} + +func (s *stepStack) Execute(ctx context.Context) *stepStack { + steps := s.steps + for len(steps) > 0 { + l := len(steps) + todo := steps[l-1] + childSteps, err := todo.Execute(ctx) + if retry.IsUnRecoverable(err) { + log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc())) + return nil + } + if err != nil { + s.steps = nil // let s can be collected. + log.Warn("failed to execute step, wait for reschedule", zap.Error(err), zap.String("step", todo.Desc())) + return &stepStack{steps: steps} + } + // this step is done. + steps = steps[:l-1] + steps = append(steps, childSteps...) + } + // everything is done. + return nil +} + +type selectStepPolicy func(map[*stepStack]struct{}) []*stepStack + +func randomSelect(parallel int, m map[*stepStack]struct{}) []*stepStack { + if parallel <= 0 { + parallel = defaultBgExecutingParallel + } + res := make([]*stepStack, 0, parallel) + for s := range m { + if len(res) >= parallel { + break + } + res = append(res, s) + } + return res +} + +func randomSelectPolicy(parallel int) selectStepPolicy { + return func(m map[*stepStack]struct{}) []*stepStack { + return randomSelect(parallel, m) + } +} + +func defaultSelectPolicy() selectStepPolicy { + return randomSelectPolicy(defaultBgExecutingParallel) +} + +type bgOpt func(*bgStepExecutor) + +func withSelectStepPolicy(policy selectStepPolicy) bgOpt { + return func(bg *bgStepExecutor) { + bg.selector = policy + } +} + +func withBgInterval(interval time.Duration) bgOpt { + return func(bg *bgStepExecutor) { + bg.interval = interval + } +} + +type bgStepExecutor struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + bufferedSteps map[*stepStack]struct{} + selector selectStepPolicy + mu sync.Mutex + notify chan struct{} + interval time.Duration +} + +func newBgStepExecutor(ctx context.Context, opts ...bgOpt) *bgStepExecutor { + ctx1, cancel := context.WithCancel(ctx) + bg := &bgStepExecutor{ + ctx: ctx1, + cancel: cancel, + wg: sync.WaitGroup{}, + bufferedSteps: make(map[*stepStack]struct{}), + selector: defaultSelectPolicy(), + mu: sync.Mutex{}, + notify: make(chan struct{}, 1), + interval: defaultBgExecutingInterval, + } + for _, opt := range opts { + opt(bg) + } + return bg +} + +func (bg *bgStepExecutor) Start() { + bg.wg.Add(1) + go bg.scheduleLoop() +} + +func (bg *bgStepExecutor) Stop() { + bg.cancel() + bg.wg.Wait() +} + +func (bg *bgStepExecutor) AddSteps(s *stepStack) { + bg.mu.Lock() + bg.addStepsInternal(s) + bg.mu.Unlock() + + select { + case bg.notify <- struct{}{}: + default: + } +} + +func (bg *bgStepExecutor) process(steps []*stepStack) { + wg := sync.WaitGroup{} + for i := range steps { + s := steps[i] + if s == nil { + continue + } + wg.Add(1) + go func() { + defer wg.Done() + child := s.Execute(bg.ctx) + if child != nil { + bg.AddSteps(child) + } + }() + } + wg.Wait() +} + +func (bg *bgStepExecutor) schedule() { + bg.mu.Lock() + selected := bg.selector(bg.bufferedSteps) + for _, s := range selected { + bg.removeStepsInternal(s) + } + bg.mu.Unlock() + + bg.process(selected) +} + +func (bg *bgStepExecutor) scheduleLoop() { + defer bg.wg.Done() + + ticker := time.NewTicker(bg.interval) + defer ticker.Stop() + + for { + select { + case <-bg.ctx.Done(): + return + case <-bg.notify: + bg.schedule() + case <-ticker.C: + bg.schedule() + } + } +} + +func (bg *bgStepExecutor) addStepsInternal(s *stepStack) { + bg.bufferedSteps[s] = struct{}{} +} + +func (bg *bgStepExecutor) removeStepsInternal(s *stepStack) { + delete(bg.bufferedSteps, s) +} diff --git a/internal/rootcoord/step_executor_test.go b/internal/rootcoord/step_executor_test.go new file mode 100644 index 0000000000..ad12534e33 --- /dev/null +++ b/internal/rootcoord/step_executor_test.go @@ -0,0 +1,175 @@ +package rootcoord + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/util/retry" + + "github.com/stretchr/testify/assert" +) + +type mockChildStep struct { +} + +func (m *mockChildStep) Execute(ctx context.Context) ([]nestedStep, error) { + return nil, nil +} + +func (m *mockChildStep) Desc() string { + return "mock child step" +} + +func (m *mockChildStep) Weight() stepPriority { + return stepPriorityLow +} + +func newMockChildStep() *mockChildStep { + return &mockChildStep{} +} + +type mockStepWithChild struct { +} + +func (m *mockStepWithChild) Execute(ctx context.Context) ([]nestedStep, error) { + return []nestedStep{newMockChildStep()}, nil +} + +func (m *mockStepWithChild) Desc() string { + return "mock step with child" +} + +func (m *mockStepWithChild) Weight() stepPriority { + return stepPriorityLow +} + +func newMockStepWithChild() *mockStepWithChild { + return &mockStepWithChild{} +} + +func Test_stepStack_Execute(t *testing.T) { + t.Run("normal case", func(t *testing.T) { + steps := []nestedStep{ + newMockStepWithChild(), + newMockChildStep(), + } + s := &stepStack{steps: steps} + unfinished := s.Execute(context.Background()) + assert.Nil(t, unfinished) + }) + + t.Run("error case", func(t *testing.T) { + steps := []nestedStep{ + newMockNormalStep(), + newMockNormalStep(), + newMockFailStep(), + newMockNormalStep(), + } + s := &stepStack{steps: steps} + unfinished := s.Execute(context.Background()) + assert.Equal(t, 3, len(unfinished.steps)) + }) + + t.Run("Unrecoverable", func(t *testing.T) { + failStep := newMockFailStep() + failStep.err = retry.Unrecoverable(errors.New("error mock Execute")) + steps := []nestedStep{ + failStep, + } + s := &stepStack{steps: steps} + unfinished := s.Execute(context.Background()) + assert.Nil(t, unfinished) + }) +} + +func Test_randomSelect(t *testing.T) { + s0 := &stepStack{steps: []nestedStep{}} + s1 := &stepStack{steps: []nestedStep{ + newMockNormalStep(), + }} + s2 := &stepStack{steps: []nestedStep{ + newMockNormalStep(), + newMockNormalStep(), + }} + s3 := &stepStack{steps: []nestedStep{ + newMockNormalStep(), + newMockNormalStep(), + newMockNormalStep(), + }} + s4 := &stepStack{steps: []nestedStep{ + newMockNormalStep(), + newMockNormalStep(), + newMockNormalStep(), + newMockNormalStep(), + }} + m := map[*stepStack]struct{}{ + s0: {}, + s1: {}, + s2: {}, + s3: {}, + s4: {}, + } + selected := randomSelect(0, m) + assert.Equal(t, defaultBgExecutingParallel, len(selected)) + for _, s := range selected { + _, ok := m[s] + assert.True(t, ok) + } + selected = randomSelect(2, m) + assert.Equal(t, 2, len(selected)) + for _, s := range selected { + _, ok := m[s] + assert.True(t, ok) + } +} + +func Test_bgStepExecutor_scheduleLoop(t *testing.T) { + bg := newBgStepExecutor(context.Background(), + withSelectStepPolicy(defaultSelectPolicy()), + withBgInterval(time.Millisecond*10)) + bg.Start() + n := 20 + records := make([]int, 0, n) + steps := make([]*stepStack, 0, n) + for i := 0; i < n; i++ { + var s *stepStack + r := rand.Int() % 3 + records = append(records, r) + switch r { + case 0: + s = nil + case 1: + failStep := newMockFailStep() + failStep.err = retry.Unrecoverable(errors.New("error mock Execute")) + s = &stepStack{steps: []nestedStep{ + newMockNormalStep(), + failStep, + newMockNormalStep(), + }} + case 2: + s = &stepStack{steps: []nestedStep{ + newMockNormalStep(), + newMockNormalStep(), + newMockNormalStep(), + }} + default: + } + steps = append(steps, s) + bg.AddSteps(s) + } + for i, r := range records { + switch r { + case 0: + assert.Nil(t, steps[i]) + case 1: + <-steps[i].steps[1].(*mockFailStep).calledChan + assert.True(t, steps[i].steps[1].(*mockFailStep).called) + case 2: + default: + } + } + bg.Stop() +} diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 13347345b7..842e298e93 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -42,7 +42,32 @@ var ( ttCheckerWarnMsg = fmt.Sprintf("RootCoord haven't synchronized the time tick for %f minutes", timeTickSyncTtInterval.Minutes()) ) -// TODO: better to accept ctx for timetickSync-related method, which can trace the ddl. +type ttHistogram struct { + sync.Map +} + +func newTtHistogram() *ttHistogram { + return &ttHistogram{} +} + +func (h *ttHistogram) update(channel string, ts Timestamp) { + h.Store(channel, ts) +} + +func (h *ttHistogram) get(channel string) Timestamp { + ts, ok := h.Load(channel) + if !ok { + return typeutil.ZeroTimestamp + } + return ts.(Timestamp) +} + +func (h *ttHistogram) remove(channels ...string) { + for _, channel := range channels { + h.Delete(channel) + } +} + type timetickSync struct { ctx context.Context sourceID typeutil.UniqueID @@ -52,6 +77,8 @@ type timetickSync struct { lock sync.Mutex sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg sendChan chan map[typeutil.UniqueID]*chanTsMsg + + syncedTtHistogram *ttHistogram } type chanTsMsg struct { @@ -98,6 +125,8 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact lock: sync.Mutex{}, sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg), sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16), + + syncedTtHistogram: newTtHistogram(), } } @@ -247,6 +276,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil { log.Warn("SendTimeTickToChannel fail", zap.Error(err)) } + t.syncedTtHistogram.update(chanName, mints) wg.Done() }(chanName, ts) } @@ -326,6 +356,7 @@ func (t *timetickSync) addDmlChannels(names ...string) { // RemoveDmlChannels remove dml channels func (t *timetickSync) removeDmlChannels(names ...string) { t.dmlChannels.removeChannels(names...) + // t.syncedTtHistogram.remove(names...) // channel ts shouldn't go back. log.Info("remove dml channels", zap.Strings("channels", names)) } @@ -339,6 +370,10 @@ func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstr return t.dmlChannels.broadcastMark(chanNames, pack) } +func (t *timetickSync) getSyncedTimeTick(channel string) Timestamp { + return t.syncedTtHistogram.get(channel) +} + func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { var ret typeutil.Timestamp for _, t := range tt { diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index 5bd5097292..99b34a1dd6 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -21,6 +21,8 @@ import ( "sync" "testing" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/api/commonpb" @@ -104,3 +106,15 @@ func TestTimetickSync(t *testing.T) { }) wg.Wait() } + +func Test_ttHistogram_get(t *testing.T) { + h := newTtHistogram() + assert.Equal(t, typeutil.ZeroTimestamp, h.get("not_exist")) + h.update("ch1", 100) + assert.Equal(t, Timestamp(100), h.get("ch1")) + h.update("ch2", 1000) + assert.Equal(t, Timestamp(1000), h.get("ch2")) + h.remove("ch1", "ch2", "not_exist") + assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch1")) + assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch2")) +} diff --git a/internal/rootcoord/undo.go b/internal/rootcoord/undo.go index 0550ccb71e..5d573968e2 100644 --- a/internal/rootcoord/undo.go +++ b/internal/rootcoord/undo.go @@ -3,55 +3,42 @@ package rootcoord import ( "context" "fmt" - "time" "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" ) type baseUndoTask struct { - todoStep []Step // steps to execute - undoStep []Step // steps to undo + todoStep []nestedStep // steps to execute + undoStep []nestedStep // steps to undo + stepExecutor StepExecutor } -func newBaseUndoTask() *baseUndoTask { +func newBaseUndoTask(stepExecutor StepExecutor) *baseUndoTask { return &baseUndoTask{ - todoStep: make([]Step, 0), - undoStep: make([]Step, 0), + todoStep: make([]nestedStep, 0), + undoStep: make([]nestedStep, 0), + stepExecutor: stepExecutor, } } -func (b *baseUndoTask) AddStep(todoStep, undoStep Step) { +func (b *baseUndoTask) AddStep(todoStep, undoStep nestedStep) { b.todoStep = append(b.todoStep, todoStep) b.undoStep = append(b.undoStep, undoStep) } -func (b *baseUndoTask) undoFromLastFinished(lastFinished int) { - // You cannot just use the ctx of task, since it will be canceled after response is returned. - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - for i := lastFinished; i >= 0; i-- { - undo := b.undoStep[i] - if err := undo.Execute(ctx); err != nil { - // You depend on the collection meta to do other gc. - // TODO: add ddl logger after other service can be idempotent enough, then you can do separate steps - // independently. - log.Error("failed to execute step, garbage may be generated", zap.Error(err)) - return - } - } -} - func (b *baseUndoTask) Execute(ctx context.Context) error { if len(b.todoStep) != len(b.undoStep) { return fmt.Errorf("todo step and undo step length not equal") } for i := 0; i < len(b.todoStep); i++ { todoStep := b.todoStep[i] - err := todoStep.Execute(ctx) - if err != nil { - go b.undoFromLastFinished(i - 1) - log.Warn("failed to execute step, trying to undo", zap.Error(err)) + // no children step in normal case. + if _, err := todoStep.Execute(ctx); err != nil { + log.Warn("failed to execute step, trying to undo", zap.Error(err), zap.String("desc", todoStep.Desc())) + undoSteps := b.undoStep[:i] + b.undoStep = nil // let baseUndoTask can be collected. + go b.stepExecutor.AddSteps(&stepStack{undoSteps}) return err } } diff --git a/internal/rootcoord/undo_test.go b/internal/rootcoord/undo_test.go index 235a5f321b..af2713789a 100644 --- a/internal/rootcoord/undo_test.go +++ b/internal/rootcoord/undo_test.go @@ -7,18 +7,28 @@ import ( "github.com/stretchr/testify/assert" ) +func newTestUndoTask() *baseUndoTask { + stepExecutor := newMockStepExecutor() + stepExecutor.AddStepsFunc = func(s *stepStack) { + // no schedule, execute directly. + s.Execute(context.Background()) + } + undoTask := newBaseUndoTask(stepExecutor) + return undoTask +} + func Test_baseUndoTask_Execute(t *testing.T) { t.Run("should not happen", func(t *testing.T) { - undoTask := newBaseUndoTask() + undoTask := newTestUndoTask() undoTask.todoStep = append(undoTask.todoStep, newMockNormalStep()) err := undoTask.Execute(context.Background()) assert.Error(t, err) }) t.Run("normal case, no undo step will be called", func(t *testing.T) { - undoTask := newBaseUndoTask() + undoTask := newTestUndoTask() n := 10 - todoSteps, undoSteps := make([]Step, 0, n), make([]Step, 0, n) + todoSteps, undoSteps := make([]nestedStep, 0, n), make([]nestedStep, 0, n) for i := 0; i < n; i++ { normalTodoStep := newMockNormalStep() normalUndoStep := newMockNormalStep() @@ -37,13 +47,13 @@ func Test_baseUndoTask_Execute(t *testing.T) { }) t.Run("partial error, undo from last finished", func(t *testing.T) { - undoTask := newBaseUndoTask() - todoSteps := []Step{ + undoTask := newTestUndoTask() + todoSteps := []nestedStep{ newMockNormalStep(), newMockFailStep(), newMockNormalStep(), } - undoSteps := []Step{ + undoSteps := []nestedStep{ newMockNormalStep(), newMockNormalStep(), newMockNormalStep(), @@ -65,13 +75,13 @@ func Test_baseUndoTask_Execute(t *testing.T) { }) t.Run("partial error, undo meet error also", func(t *testing.T) { - undoTask := newBaseUndoTask() - todoSteps := []Step{ + undoTask := newTestUndoTask() + todoSteps := []nestedStep{ newMockNormalStep(), newMockNormalStep(), newMockFailStep(), } - undoSteps := []Step{ + undoSteps := []nestedStep{ newMockNormalStep(), newMockFailStep(), newMockNormalStep(),