Make ddl follow tt mechanism (#19279)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-09-21 15:46:51 +08:00 committed by GitHub
parent 97aa2bd84f
commit 42d371fd41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1306 additions and 341 deletions

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -13,7 +12,6 @@ import (
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"go.uber.org/zap"
)
@ -52,11 +50,6 @@ func newServerBroker(s *Core) *ServerBroker {
func (b *ServerBroker) ReleaseCollection(ctx context.Context, collectionID UniqueID) error {
log.Info("releasing collection", zap.Int64("collection", collectionID))
if err := funcutil.WaitForComponentHealthy(ctx, b.s.queryCoord, "QueryCoord", 100, time.Millisecond*200); err != nil {
log.Error("failed to release collection, querycoord not healthy", zap.Error(err), zap.Int64("collection", collectionID))
return err
}
resp, err := b.s.queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_ReleaseCollection},
CollectionID: collectionID,
@ -100,10 +93,6 @@ func toKeyDataPairs(m map[string][]byte) []*commonpb.KeyDataPair {
func (b *ServerBroker) WatchChannels(ctx context.Context, info *watchInfo) error {
log.Info("watching channels", zap.Uint64("ts", info.ts), zap.Int64("collection", info.collectionID), zap.Strings("vChannels", info.vChannels))
if err := funcutil.WaitForComponentHealthy(ctx, b.s.dataCoord, "DataCoord", 100, time.Millisecond*200); err != nil {
return err
}
resp, err := b.s.dataCoord.WatchChannels(ctx, &datapb.WatchChannelsRequest{
CollectionID: info.collectionID,
ChannelNames: info.vChannels,
@ -193,9 +182,6 @@ func (b *ServerBroker) Import(ctx context.Context, req *datapb.ImportTaskRequest
}
func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID) error {
if err := funcutil.WaitForComponentHealthy(ctx, b.s.indexCoord, "IndexCoord", 100, time.Millisecond*100); err != nil {
return err
}
rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{
CollectionID: collID,
IndexName: "",

View File

@ -12,14 +12,6 @@ import (
)
func TestServerBroker_ReleaseCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withUnhealthyQueryCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.ReleaseCollection(ctx, 1)
assert.Error(t, err)
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withInvalidQueryCoord())
b := newServerBroker(c)
@ -74,14 +66,6 @@ func TestServerBroker_GetSegmentInfo(t *testing.T) {
}
func TestServerBroker_WatchChannels(t *testing.T) {
t.Run("unhealthy", func(t *testing.T) {
c := newTestCore(withUnhealthyDataCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.WatchChannels(ctx, &watchInfo{})
assert.Error(t, err)
})
t.Run("failed to execute", func(t *testing.T) {
defer cleanTestEnv()
@ -228,14 +212,6 @@ func TestServerBroker_Import(t *testing.T) {
}
func TestServerBroker_DropCollectionIndex(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withUnhealthyIndexCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.DropCollectionIndex(ctx, 1)
assert.Error(t, err)
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withInvalidIndexCoord())
b := newServerBroker(c)

View File

@ -257,20 +257,20 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
return nil
}
undoTask := newBaseUndoTask()
undoTask.AddStep(&NullStep{}, &RemoveDmlChannelsStep{
undoTask := newBaseUndoTask(t.core.stepExecutor)
undoTask.AddStep(&nullStep{}, &removeDmlChannelsStep{
baseStep: baseStep{core: t.core},
pchannels: chanNames,
pChannels: chanNames,
}) // remove dml channels if any error occurs.
undoTask.AddStep(&AddCollectionMetaStep{
undoTask.AddStep(&addCollectionMetaStep{
baseStep: baseStep{core: t.core},
coll: &collInfo,
}, &DeleteCollectionMetaStep{
}, &deleteCollectionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
ts: ts,
})
undoTask.AddStep(&WatchChannelsStep{
undoTask.AddStep(&watchChannelsStep{
baseStep: baseStep{core: t.core},
info: &watchInfo{
ts: ts,
@ -278,17 +278,17 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
vChannels: t.channels.virtualChannels,
startPositions: toKeyDataPairs(startPositions),
},
}, &UnwatchChannelsStep{
}, &unwatchChannelsStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
channels: t.channels,
})
undoTask.AddStep(&ChangeCollectionStateStep{
undoTask.AddStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
state: pb.CollectionState_CollectionCreated,
ts: ts,
}, &NullStep{}) // We'll remove the whole collection anyway.
}, &nullStep{}) // We'll remove the whole collection anyway.
return undoTask.Execute(ctx)
}

View File

@ -53,17 +53,17 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
State: pb.PartitionState_PartitionCreated,
}
undoTask := newBaseUndoTask()
undoTask.AddStep(&ExpireCacheStep{
undoTask := newBaseUndoTask(t.core.stepExecutor)
undoTask.AddStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: []string{t.Req.GetCollectionName()},
collectionID: t.collMeta.CollectionID,
ts: t.GetTs(),
}, &NullStep{})
undoTask.AddStep(&AddPartitionMetaStep{
}, &nullStep{})
undoTask.AddStep(&addPartitionMetaStep{
baseStep: baseStep{core: t.core},
partition: partition,
}, &NullStep{}) // adding partition is atomic enough.
}, &nullStep{}) // adding partition is atomic enough.
return undoTask.Execute(ctx)
}

View File

@ -0,0 +1,64 @@
package rootcoord
import (
"sync"
"github.com/milvus-io/milvus/internal/tso"
"go.uber.org/atomic"
)
type DdlTsLockManagerV2 interface {
GetMinDdlTs() Timestamp
AddRefCnt(delta int32)
Lock()
Unlock()
UpdateLastTs(ts Timestamp)
}
type ddlTsLockManagerV2 struct {
lastTs atomic.Uint64
inProgressCnt atomic.Int32
tsoAllocator tso.Allocator
mu sync.Mutex
}
func (c *ddlTsLockManagerV2) GetMinDdlTs() Timestamp {
// In fact, `TryLock` can replace the `inProgressCnt` but it's not recommended.
if c.inProgressCnt.Load() > 0 {
return c.lastTs.Load()
}
c.Lock()
defer c.Unlock()
ts, err := c.tsoAllocator.GenerateTSO(1)
if err != nil {
return c.lastTs.Load()
}
c.UpdateLastTs(ts)
return ts
}
func (c *ddlTsLockManagerV2) AddRefCnt(delta int32) {
c.inProgressCnt.Add(delta)
}
func (c *ddlTsLockManagerV2) Lock() {
c.mu.Lock()
}
func (c *ddlTsLockManagerV2) Unlock() {
c.mu.Unlock()
}
func (c *ddlTsLockManagerV2) UpdateLastTs(ts Timestamp) {
c.lastTs.Store(ts)
}
func newDdlTsLockManagerV2(tsoAllocator tso.Allocator) *ddlTsLockManagerV2 {
return &ddlTsLockManagerV2{
lastTs: *atomic.NewUint64(0),
inProgressCnt: *atomic.NewInt32(0),
tsoAllocator: tsoAllocator,
mu: sync.Mutex{},
}
}

View File

@ -0,0 +1,43 @@
package rootcoord
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_ddlTsLockManager_GetMinDdlTs(t *testing.T) {
t.Run("there are in-progress tasks", func(t *testing.T) {
m := newDdlTsLockManagerV2(nil)
m.UpdateLastTs(100)
m.inProgressCnt.Store(9999)
ts := m.GetMinDdlTs()
assert.Equal(t, Timestamp(100), ts)
})
t.Run("failed to generate ts", func(t *testing.T) {
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 0, errors.New("error mock GenerateTSO")
}
m := newDdlTsLockManagerV2(tsoAllocator)
m.UpdateLastTs(101)
m.inProgressCnt.Store(0)
ts := m.GetMinDdlTs()
assert.Equal(t, Timestamp(101), ts)
})
t.Run("normal case", func(t *testing.T) {
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 102, nil
}
m := newDdlTsLockManagerV2(tsoAllocator)
m.UpdateLastTs(101)
m.inProgressCnt.Store(0)
ts := m.GetMinDdlTs()
assert.Equal(t, Timestamp(102), ts)
assert.Equal(t, Timestamp(102), m.lastTs.Load())
})
}

View File

@ -52,39 +52,38 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
ts := t.GetTs()
redoTask := newBaseRedoTask()
redoTask := newBaseRedoTask(t.core.stepExecutor)
redoTask.AddSyncStep(&ExpireCacheStep{
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
ts: ts,
})
redoTask.AddSyncStep(&ChangeCollectionStateStep{
redoTask.AddSyncStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
state: pb.CollectionState_CollectionDropping,
ts: ts,
})
redoTask.AddAsyncStep(&ReleaseCollectionStep{
redoTask.AddAsyncStep(&releaseCollectionStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
})
redoTask.AddAsyncStep(&DropIndexStep{
redoTask.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: t.core},
collID: collMeta.CollectionID,
})
redoTask.AddAsyncStep(&DeleteCollectionDataStep{
redoTask.AddAsyncStep(&deleteCollectionDataStep{
baseStep: baseStep{core: t.core},
coll: collMeta,
ts: ts,
})
redoTask.AddAsyncStep(&RemoveDmlChannelsStep{
redoTask.AddAsyncStep(&removeDmlChannelsStep{
baseStep: baseStep{core: t.core},
pchannels: collMeta.PhysicalChannelNames,
pChannels: collMeta.PhysicalChannelNames,
})
redoTask.AddAsyncStep(&DeleteCollectionMetaStep{
redoTask.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
ts: ts,

View File

@ -5,8 +5,6 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/metastore/model"
@ -176,10 +174,10 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
gc := newMockGarbageCollector()
deleteCollectionCalled := false
deleteCollectionChan := make(chan struct{}, 1)
gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection) (Timestamp, error) {
deleteCollectionCalled = true
deleteCollectionChan <- struct{}{}
return nil
return 0, nil
}
core := newTestCore(

View File

@ -53,14 +53,14 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
return nil
}
redoTask := newBaseRedoTask()
redoTask.AddSyncStep(&ExpireCacheStep{
redoTask := newBaseRedoTask(t.core.stepExecutor)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: []string{t.Req.GetCollectionName()},
collectionID: t.collMeta.CollectionID,
ts: t.GetTs(),
})
redoTask.AddSyncStep(&ChangePartitionStateStep{
redoTask.AddSyncStep(&changePartitionStateStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
partitionID: partID,
@ -69,7 +69,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
})
// TODO: release partition when query coord is ready.
redoTask.AddAsyncStep(&DeletePartitionDataStep{
redoTask.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: t.core},
pchans: t.collMeta.PhysicalChannelNames,
partition: &model.Partition{
@ -77,9 +77,8 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
PartitionName: t.Req.GetPartitionName(),
CollectionID: t.collMeta.CollectionID,
},
ts: t.GetTs(),
})
redoTask.AddAsyncStep(&RemovePartitionMetaStep{
redoTask.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
partitionID: partID,

View File

@ -4,8 +4,6 @@ import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/metastore/model"
@ -146,10 +144,10 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
gc := newMockGarbageCollector()
deletePartitionCalled := false
deletePartitionChan := make(chan struct{}, 1)
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition, ts typeutil.Timestamp) error {
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) {
deletePartitionChan <- struct{}{}
deletePartitionCalled = true
return nil
return 0, nil
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc))

View File

@ -2,101 +2,111 @@ package rootcoord
import (
"context"
"time"
"github.com/milvus-io/milvus/api/commonpb"
ms "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/model"
"go.uber.org/zap"
)
type GarbageCollector interface {
ReDropCollection(collMeta *model.Collection, ts Timestamp)
RemoveCreatingCollection(collMeta *model.Collection)
ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp)
GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error
GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error
GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error)
GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
}
type GarbageCollectorCtx struct {
type bgGarbageCollector struct {
s *Core
}
func newGarbageCollectorCtx(s *Core) *GarbageCollectorCtx {
return &GarbageCollectorCtx{s: s}
func newBgGarbageCollector(s *Core) *bgGarbageCollector {
return &bgGarbageCollector{s: s}
}
func (c *GarbageCollectorCtx) ReDropCollection(collMeta *model.Collection, ts Timestamp) {
func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Timestamp) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(collMeta.PhysicalChannelNames...)
defer c.s.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
redo := newBaseRedoTask(c.s.stepExecutor)
redo.AddAsyncStep(&releaseCollectionStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
})
redo.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: c.s},
collID: collMeta.CollectionID,
})
redo.AddAsyncStep(&deleteCollectionDataStep{
baseStep: baseStep{core: c.s},
coll: collMeta,
})
redo.AddAsyncStep(&removeDmlChannelsStep{
baseStep: baseStep{core: c.s},
pChannels: collMeta.PhysicalChannelNames,
})
redo.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
ts: ts,
})
if err := c.s.broker.ReleaseCollection(ctx, collMeta.CollectionID); err != nil {
log.Error("failed to release collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID))
return
}
if err := c.s.broker.DropCollectionIndex(ctx, collMeta.CollectionID); err != nil {
log.Error("failed to drop collection index when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID))
return
}
if err := c.GcCollectionData(ctx, collMeta, ts); err != nil {
log.Error("failed to notify datacoord to gc collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID))
return
}
if err := c.s.meta.RemoveCollection(ctx, collMeta.CollectionID, ts); err != nil {
log.Error("failed to remove collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID))
}
// err is ignored since no sync steps will be executed.
_ = redo.Execute(context.Background())
}
func (c *GarbageCollectorCtx) RemoveCreatingCollection(collMeta *model.Collection) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if err := c.s.broker.UnwatchChannels(ctx, &watchInfo{collectionID: collMeta.CollectionID, vChannels: collMeta.VirtualChannelNames}); err != nil {
log.Error("failed to unwatch channels when recovery",
zap.Error(err),
zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID),
zap.Strings("vchans", collMeta.VirtualChannelNames), zap.Strings("pchans", collMeta.PhysicalChannelNames))
return
}
if err := c.s.meta.RemoveCollection(ctx, collMeta.CollectionID, collMeta.CreateTime); err != nil {
log.Error("failed to remove collection when recovery", zap.Error(err), zap.String("collection", collMeta.Name), zap.Int64("collection id", collMeta.CollectionID))
}
func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) {
redo := newBaseRedoTask(c.s.stepExecutor)
redo.AddAsyncStep(&unwatchChannelsStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
channels: collectionChannels{
virtualChannels: collMeta.VirtualChannelNames,
physicalChannels: collMeta.PhysicalChannelNames,
},
})
redo.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
ts: collMeta.CreateTime,
})
// err is ignored since no sync steps will be executed.
_ = redo.Execute(context.Background())
}
func (c *GarbageCollectorCtx) ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// TODO: release partition when query coord is ready.
func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(pChannels...)
defer c.s.chanTimeTick.removeDmlChannels(pChannels...)
if err := c.GcPartitionData(ctx, pChannels, partition, ts); err != nil {
log.Error("failed to notify datanodes to gc partition", zap.Error(err))
return
}
redo := newBaseRedoTask(c.s.stepExecutor)
redo.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: c.s},
pchans: pChannels,
partition: partition,
})
redo.AddAsyncStep(&removeDmlChannelsStep{
baseStep: baseStep{core: c.s},
pChannels: pChannels,
})
redo.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: partition.CollectionID,
partitionID: partition.PartitionID,
ts: ts,
})
if err := c.s.meta.RemovePartition(ctx, partition.CollectionID, partition.PartitionID, ts); err != nil {
log.Error("failed to remove partition when recovery", zap.Error(err))
}
// err is ignored since no sync steps will be executed.
_ = redo.Execute(context.Background())
}
func (c *GarbageCollectorCtx) GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
ts, err := c.s.tsoAllocator.GenerateTSO(1)
if err != nil {
return 0, err
}
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -118,15 +128,18 @@ func (c *GarbageCollectorCtx) GcCollectionData(ctx context.Context, coll *model.
}
msgPack.Msgs = append(msgPack.Msgs, msg)
if err := c.s.chanTimeTick.broadcastDmlChannels(coll.PhysicalChannelNames, &msgPack); err != nil {
return err
return 0, err
}
// TODO: remove this after gc can be notified by rpc. Without this tt, DropCollectionMsg cannot be seen by
// datanodes.
return c.s.chanTimeTick.sendTimeTickToChannel(coll.PhysicalChannelNames, ts)
return ts, nil
}
func (c *GarbageCollectorCtx) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error {
func (c *bgGarbageCollector) notifyPartitionGc(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
ts, err := c.s.tsoAllocator.GenerateTSO(1)
if err != nil {
return 0, err
}
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -149,10 +162,36 @@ func (c *GarbageCollectorCtx) GcPartitionData(ctx context.Context, pChannels []s
}
msgPack.Msgs = append(msgPack.Msgs, msg)
if err := c.s.chanTimeTick.broadcastDmlChannels(pChannels, &msgPack); err != nil {
return err
return 0, err
}
// TODO: remove this after gc can be notified by rpc. Without this tt, DropCollectionMsg cannot be seen by
// datanodes.
return c.s.chanTimeTick.sendTimeTickToChannel(pChannels, ts)
return ts, nil
}
func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
defer c.s.ddlTsLockManager.AddRefCnt(-1)
defer c.s.ddlTsLockManager.Unlock()
ddlTs, err = c.notifyCollectionGc(ctx, coll)
if err != nil {
return 0, err
}
c.s.ddlTsLockManager.UpdateLastTs(ddlTs)
return ddlTs, nil
}
func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
defer c.s.ddlTsLockManager.AddRefCnt(-1)
defer c.s.ddlTsLockManager.Unlock()
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
if err != nil {
return 0, err
}
c.s.ddlTsLockManager.UpdateLastTs(ddlTs)
return ddlTs, nil
}

View File

@ -18,15 +18,17 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
}
ticker := newTickerWithMockNormalStream()
core := newTestCore(withBroker(broker), withTtSynchronizer(ticker))
gc := newGarbageCollectorCtx(core)
gc := newBgGarbageCollector(core)
gc.ReDropCollection(&model.Collection{}, 1000)
})
t.Run("failed to DropCollectionIndex", func(t *testing.T) {
broker := newMockBroker()
releaseCollectionCalled := false
releaseCollectionChan := make(chan struct{}, 1)
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
releaseCollectionCalled = true
releaseCollectionChan <- struct{}{}
return nil
}
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
@ -34,43 +36,61 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
}
ticker := newTickerWithMockNormalStream()
core := newTestCore(withBroker(broker), withTtSynchronizer(ticker))
gc := newGarbageCollectorCtx(core)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropCollection(&model.Collection{}, 1000)
<-releaseCollectionChan
assert.True(t, releaseCollectionCalled)
})
t.Run("failed to GcCollectionData", func(t *testing.T) {
broker := newMockBroker()
releaseCollectionCalled := false
releaseCollectionChan := make(chan struct{}, 1)
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
releaseCollectionCalled = true
releaseCollectionChan <- struct{}{}
return nil
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
}
ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
core := newTestCore(withBroker(broker), withTtSynchronizer(ticker))
gc := newGarbageCollectorCtx(core)
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withBroker(broker), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
shardsNum := 2
pchans := ticker.getDmlChannelNames(shardsNum)
gc.ReDropCollection(&model.Collection{PhysicalChannelNames: pchans}, 1000)
<-releaseCollectionChan
assert.True(t, releaseCollectionCalled)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
})
t.Run("failed to remove collection", func(t *testing.T) {
broker := newMockBroker()
releaseCollectionCalled := false
releaseCollectionChan := make(chan struct{}, 1)
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
releaseCollectionCalled = true
releaseCollectionChan <- struct{}{}
return nil
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
}
meta := newMockMetaTable()
@ -78,41 +98,66 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
return errors.New("error mock RemoveCollection")
}
ticker := newTickerWithMockNormalStream()
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withBroker(broker),
withTtSynchronizer(ticker),
withTsoAllocator(tsoAllocator),
withMeta(meta))
gc := newGarbageCollectorCtx(core)
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropCollection(&model.Collection{}, 1000)
<-releaseCollectionChan
assert.True(t, releaseCollectionCalled)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
})
t.Run("normal case", func(t *testing.T) {
broker := newMockBroker()
releaseCollectionCalled := false
releaseCollectionChan := make(chan struct{}, 1)
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
releaseCollectionCalled = true
releaseCollectionChan <- struct{}{}
return nil
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
}
meta := newMockMetaTable()
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
removeCollectionCalled = true
removeCollectionChan <- struct{}{}
return nil
}
ticker := newTickerWithMockNormalStream()
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withBroker(broker),
withTtSynchronizer(ticker),
withTsoAllocator(tsoAllocator),
withMeta(meta))
gc := newGarbageCollectorCtx(core)
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropCollection(&model.Collection{}, 1000)
<-releaseCollectionChan
assert.True(t, releaseCollectionCalled)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
<-removeCollectionChan
assert.True(t, removeCollectionCalled)
})
}
@ -124,15 +169,18 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
return errors.New("error mock UnwatchChannels")
}
core := newTestCore(withBroker(broker))
gc := newGarbageCollectorCtx(core)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.RemoveCreatingCollection(&model.Collection{})
})
t.Run("failed to RemoveCollection", func(t *testing.T) {
broker := newMockBroker()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{}
return nil
}
meta := newMockMetaTable()
@ -140,69 +188,96 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
return errors.New("error mock RemoveCollection")
}
core := newTestCore(withBroker(broker), withMeta(meta))
gc := newGarbageCollectorCtx(core)
gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{})
<-unwatchChannelsChan
assert.True(t, unwatchChannelsCalled)
})
t.Run("normal case", func(t *testing.T) {
broker := newMockBroker()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{}
return nil
}
meta := newMockMetaTable()
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
removeCollectionCalled = true
removeCollectionChan <- struct{}{}
return nil
}
core := newTestCore(withBroker(broker), withMeta(meta))
gc := newGarbageCollectorCtx(core)
gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{})
<-unwatchChannelsChan
assert.True(t, unwatchChannelsCalled)
<-removeCollectionChan
assert.True(t, removeCollectionCalled)
})
}
// func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
// t.Run("failed to GcPartitionData", func(t *testing.T) {
// ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
// shardsNum := 2
// pchans := ticker.getDmlChannelNames(shardsNum)
// core := newTestCore(withTtSynchronizer(ticker))
// gc := newGarbageCollectorCtx(core)
// gc.ReDropPartition(pchans, &model.Partition{}, 100000)
// })
//
// t.Run("failed to RemovePartition", func(t *testing.T) {
// ticker := newTickerWithMockNormalStream()
// shardsNum := 2
// pchans := ticker.getDmlChannelNames(shardsNum)
// meta := newMockMetaTable()
// meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
// return errors.New("error mock RemovePartition")
// }
// core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
// gc := newGarbageCollectorCtx(core)
// gc.ReDropPartition(pchans, &model.Partition{}, 100000)
// })
//
// t.Run("normal case", func(t *testing.T) {
// ticker := newTickerWithMockNormalStream()
// shardsNum := 2
// pchans := ticker.getDmlChannelNames(shardsNum)
// meta := newMockMetaTable()
// removePartitionCalled := false
// meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
// removePartitionCalled = true
// return nil
// }
// core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
// gc := newGarbageCollectorCtx(core)
// gc.ReDropPartition(pchans, &model.Partition{}, 100000)
// assert.True(t, removePartitionCalled)
// })
// }
//
func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
t.Run("failed to GcPartitionData", func(t *testing.T) {
ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
shardsNum := 2
pchans := ticker.getDmlChannelNames(shardsNum)
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(pchans, &model.Partition{}, 100000)
})
t.Run("failed to RemovePartition", func(t *testing.T) {
ticker := newTickerWithMockNormalStream()
shardsNum := 2
pchans := ticker.getDmlChannelNames(shardsNum)
meta := newMockMetaTable()
meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
return errors.New("error mock RemovePartition")
}
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(pchans, &model.Partition{}, 100000)
})
t.Run("normal case", func(t *testing.T) {
ticker := newTickerWithMockNormalStream()
shardsNum := 2
pchans := ticker.getDmlChannelNames(shardsNum)
meta := newMockMetaTable()
removePartitionCalled := false
removePartitionChan := make(chan struct{}, 1)
meta.RemovePartitionFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
removePartitionCalled = true
removePartitionChan <- struct{}{}
return nil
}
tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(pchans, &model.Partition{}, 100000)
<-removePartitionChan
assert.True(t, removePartitionCalled)
})
}

View File

@ -259,6 +259,12 @@ func newTestCore(opts ...Opt) *Core {
c := &Core{
session: &sessionutil.Session{ServerID: TestRootCoordID},
}
executor := newMockStepExecutor()
executor.AddStepsFunc = func(s *stepStack) {
// no schedule, execute directly.
s.Execute(context.Background())
}
c.stepExecutor = executor
for _, opt := range opts {
opt(c)
}
@ -647,7 +653,9 @@ func withAbnormalCode() Opt {
type mockScheduler struct {
IScheduler
AddTaskFunc func(t taskV2) error
AddTaskFunc func(t taskV2) error
GetMinDdlTsFunc func() Timestamp
minDdlTs Timestamp
}
func newMockScheduler() *mockScheduler {
@ -661,6 +669,13 @@ func (m mockScheduler) AddTask(t taskV2) error {
return nil
}
func (m mockScheduler) GetMinDdlTs() Timestamp {
if m.GetMinDdlTsFunc != nil {
return m.GetMinDdlTsFunc()
}
return m.minDdlTs
}
func withScheduler(sched IScheduler) Opt {
return func(c *Core) {
c.scheduler = sched
@ -759,16 +774,16 @@ func withBroker(b Broker) Opt {
type mockGarbageCollector struct {
GarbageCollector
GcCollectionDataFunc func(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error
GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error
GcCollectionDataFunc func(ctx context.Context, coll *model.Collection) (Timestamp, error)
GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error)
}
func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
return m.GcCollectionDataFunc(ctx, coll, ts)
func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (Timestamp, error) {
return m.GcCollectionDataFunc(ctx, coll)
}
func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition, ts typeutil.Timestamp) error {
return m.GcPartitionDataFunc(ctx, pChannels, partition, ts)
func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) {
return m.GcPartitionDataFunc(ctx, pChannels, partition)
}
func newMockGarbageCollector() *mockGarbageCollector {
@ -809,6 +824,9 @@ func newTickerWithMockFailStream() *timetickSync {
func newMockNormalStream() *msgstream.MockMsgStream {
stream := msgstream.NewMockMsgStream()
stream.BroadcastFunc = func(pack *msgstream.MsgPack) error {
return nil
}
stream.BroadcastMarkFunc = func(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
return map[string][]msgstream.MessageID{}, nil
}
@ -838,3 +856,63 @@ func newTickerWithFactory(factory msgstream.Factory) *timetickSync {
ticker := newTimeTickSync(ctx, TestRootCoordID, factory, chans)
return ticker
}
type mockDdlTsLockManager struct {
DdlTsLockManagerV2
GetMinDdlTsFunc func() Timestamp
}
func (m mockDdlTsLockManager) GetMinDdlTs() Timestamp {
if m.GetMinDdlTsFunc != nil {
return m.GetMinDdlTsFunc()
}
return 100
}
func newMockDdlTsLockManager() *mockDdlTsLockManager {
return &mockDdlTsLockManager{}
}
func withDdlTsLockManager(m DdlTsLockManagerV2) Opt {
return func(c *Core) {
c.ddlTsLockManager = m
}
}
type mockStepExecutor struct {
StepExecutor
StartFunc func()
StopFunc func()
AddStepsFunc func(s *stepStack)
}
func newMockStepExecutor() *mockStepExecutor {
return &mockStepExecutor{}
}
func (m mockStepExecutor) Start() {
if m.StartFunc != nil {
m.StartFunc()
} else {
}
}
func (m mockStepExecutor) Stop() {
if m.StopFunc != nil {
m.StopFunc()
} else {
}
}
func (m mockStepExecutor) AddSteps(s *stepStack) {
if m.AddStepsFunc != nil {
m.AddStepsFunc(s)
} else {
}
}
func withStepExecutor(executor StepExecutor) Opt {
return func(c *Core) {
c.stepExecutor = executor
}
}

View File

@ -2,53 +2,49 @@ package rootcoord
import (
"context"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type baseRedoTask struct {
syncTodoStep []Step // steps to execute synchronously
asyncTodoStep []Step // steps to execute asynchronously
syncTodoStep []nestedStep // steps to execute synchronously
asyncTodoStep []nestedStep // steps to execute asynchronously
stepExecutor StepExecutor
}
func newBaseRedoTask() *baseRedoTask {
func newBaseRedoTask(stepExecutor StepExecutor) *baseRedoTask {
return &baseRedoTask{
syncTodoStep: make([]Step, 0),
asyncTodoStep: make([]Step, 0),
syncTodoStep: make([]nestedStep, 0),
asyncTodoStep: make([]nestedStep, 0),
stepExecutor: stepExecutor,
}
}
func (b *baseRedoTask) AddSyncStep(step Step) {
func (b *baseRedoTask) AddSyncStep(step nestedStep) {
b.syncTodoStep = append(b.syncTodoStep, step)
}
func (b *baseRedoTask) AddAsyncStep(step Step) {
func (b *baseRedoTask) AddAsyncStep(step nestedStep) {
b.asyncTodoStep = append(b.asyncTodoStep, step)
}
func (b *baseRedoTask) redoAsyncSteps() {
// You cannot just use the ctx of task, since it will be canceled after response is returned.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for i := 0; i < len(b.asyncTodoStep); i++ {
todo := b.asyncTodoStep[i]
if err := todo.Execute(ctx); err != nil {
// You depend on the collection meta to do other gc.
// TODO: add ddl logger after other service can be idempotent enough, then you can do separate steps
// independently.
log.Error("failed to execute step, garbage may be generated", zap.Error(err))
return
}
l := len(b.asyncTodoStep)
steps := make([]nestedStep, 0, l)
for i := l - 1; i >= 0; i-- {
steps = append(steps, b.asyncTodoStep[i])
}
b.asyncTodoStep = nil // make baseRedoTask can be collected.
b.stepExecutor.AddSteps(&stepStack{steps: steps})
}
func (b *baseRedoTask) Execute(ctx context.Context) error {
for i := 0; i < len(b.syncTodoStep); i++ {
todo := b.syncTodoStep[i]
if err := todo.Execute(ctx); err != nil {
log.Error("failed to execute step", zap.Error(err))
// no children step in sync steps.
if _, err := todo.Execute(ctx); err != nil {
log.Error("failed to execute step", zap.Error(err), zap.String("desc", todo.Desc()))
return err
}
}

View File

@ -9,21 +9,27 @@ import (
)
type mockFailStep struct {
baseStep
calledChan chan struct{}
called bool
err error
}
func newMockFailStep() *mockFailStep {
return &mockFailStep{calledChan: make(chan struct{}, 1), called: false}
}
func (m *mockFailStep) Execute(ctx context.Context) error {
func (m *mockFailStep) Execute(ctx context.Context) ([]nestedStep, error) {
m.called = true
m.calledChan <- struct{}{}
return errors.New("error mock Execute")
if m.err != nil {
return nil, m.err
}
return nil, errors.New("error mock Execute")
}
type mockNormalStep struct {
nestedStep
calledChan chan struct{}
called bool
}
@ -32,16 +38,26 @@ func newMockNormalStep() *mockNormalStep {
return &mockNormalStep{calledChan: make(chan struct{}, 1), called: false}
}
func (m *mockNormalStep) Execute(ctx context.Context) error {
func (m *mockNormalStep) Execute(ctx context.Context) ([]nestedStep, error) {
m.called = true
m.calledChan <- struct{}{}
return nil
return nil, nil
}
func newTestRedoTask() *baseRedoTask {
stepExecutor := newMockStepExecutor()
stepExecutor.AddStepsFunc = func(s *stepStack) {
// no schedule, execute directly.
s.Execute(context.Background())
}
redo := newBaseRedoTask(stepExecutor)
return redo
}
func Test_baseRedoTask_redoAsyncSteps(t *testing.T) {
t.Run("partial error", func(t *testing.T) {
redo := newBaseRedoTask()
steps := []Step{newMockNormalStep(), newMockFailStep(), newMockNormalStep()}
redo := newTestRedoTask()
steps := []nestedStep{newMockNormalStep(), newMockFailStep(), newMockNormalStep()}
for _, step := range steps {
redo.AddAsyncStep(step)
}
@ -51,9 +67,9 @@ func Test_baseRedoTask_redoAsyncSteps(t *testing.T) {
})
t.Run("normal case", func(t *testing.T) {
redo := newBaseRedoTask()
redo := newTestRedoTask()
n := 10
steps := make([]Step, 0, n)
steps := make([]nestedStep, 0, n)
for i := 0; i < n; i++ {
steps = append(steps, newMockNormalStep())
}
@ -69,10 +85,10 @@ func Test_baseRedoTask_redoAsyncSteps(t *testing.T) {
func Test_baseRedoTask_Execute(t *testing.T) {
t.Run("sync not finished, no async task", func(t *testing.T) {
redo := newBaseRedoTask()
syncSteps := []Step{newMockFailStep()}
redo := newTestRedoTask()
syncSteps := []nestedStep{newMockFailStep()}
asyncNum := 10
asyncSteps := make([]Step, 0, asyncNum)
asyncSteps := make([]nestedStep, 0, asyncNum)
for i := 0; i < asyncNum; i++ {
asyncSteps = append(asyncSteps, newMockNormalStep())
}
@ -92,11 +108,11 @@ func Test_baseRedoTask_Execute(t *testing.T) {
// TODO: sync finished, but some async fail.
t.Run("normal case", func(t *testing.T) {
redo := newBaseRedoTask()
redo := newTestRedoTask()
syncNum := 10
syncSteps := make([]Step, 0, syncNum)
syncSteps := make([]nestedStep, 0, syncNum)
asyncNum := 10
asyncSteps := make([]Step, 0, asyncNum)
asyncSteps := make([]nestedStep, 0, asyncNum)
for i := 0; i < syncNum; i++ {
syncSteps = append(syncSteps, newMockNormalStep())
}

View File

@ -99,7 +99,9 @@ type Core struct {
meta IMetaTable
scheduler IScheduler
broker Broker
ddlTsLockManager DdlTsLockManagerV2
garbageCollector GarbageCollector
stepExecutor StepExecutor
metaKVCreator metaKVCreator
@ -175,6 +177,14 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error {
return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
}
func (c *Core) sendMinDdlTsAsTt() {
minDdlTs := c.ddlTsLockManager.GetMinDdlTs()
err := c.sendTimeTick(minDdlTs, "timetick loop")
if err != nil {
log.Warn("failed to send timetick", zap.Error(err))
}
}
func (c *Core) startTimeTickLoop() {
defer c.wg.Done()
ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval)
@ -183,12 +193,7 @@ func (c *Core) startTimeTickLoop() {
case <-c.ctx.Done():
return
case <-ticker.C:
if ts, err := c.tsoAllocator.GenerateTSO(1); err == nil {
err := c.sendTimeTick(ts, "timetick loop")
if err != nil {
log.Warn("failed to send timetick", zap.Error(err))
}
}
c.sendMinDdlTsAsTt()
}
}
}
@ -441,7 +446,9 @@ func (c *Core) initInternal() error {
c.proxyClientManager = newProxyClientManager(c.proxyCreator)
c.broker = newServerBroker(c)
c.garbageCollector = newGarbageCollectorCtx(c)
c.ddlTsLockManager = newDdlTsLockManagerV2(c.tsoAllocator)
c.garbageCollector = newBgGarbageCollector(c)
c.stepExecutor = newBgStepExecutor(c.ctx)
c.proxyManager = newProxyManager(
c.ctx,
@ -615,6 +622,7 @@ func (c *Core) startInternal() error {
}
c.scheduler.Start()
c.stepExecutor.Start()
Params.RootCoordCfg.CreatedTime = time.Now()
Params.RootCoordCfg.UpdatedTime = time.Now()
@ -635,6 +643,9 @@ func (c *Core) Start() error {
func (c *Core) Stop() error {
c.UpdateStateCode(internalpb.StateCode_Abnormal)
c.stepExecutor.Stop()
c.scheduler.Stop()
c.cancel()
c.wg.Wait()
// wait at most one second to revoke

View File

@ -4,6 +4,7 @@ import (
"context"
"math/rand"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -858,3 +859,38 @@ func TestCore_Rbac(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
}
func TestCore_sendMinDdlTsAsTt(t *testing.T) {
ticker := newRocksMqTtSynchronizer()
ddlManager := newMockDdlTsLockManager()
ddlManager.GetMinDdlTsFunc = func() Timestamp {
return 100
}
c := newTestCore(
withTtSynchronizer(ticker),
withDdlTsLockManager(ddlManager))
c.sendMinDdlTsAsTt() // no session.
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
c.sendMinDdlTsAsTt()
}
func TestCore_startTimeTickLoop(t *testing.T) {
ticker := newRocksMqTtSynchronizer()
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
ddlManager := newMockDdlTsLockManager()
ddlManager.GetMinDdlTsFunc = func() Timestamp {
return 100
}
c := newTestCore(
withTtSynchronizer(ticker),
withDdlTsLockManager(ddlManager))
ctx, cancel := context.WithCancel(context.Background())
c.ctx = ctx
Params.ProxyCfg.TimeTickInterval = time.Millisecond
c.wg.Add(1)
go c.startTimeTickLoop()
time.Sleep(time.Millisecond * 4)
cancel()
c.wg.Wait()
}

View File

@ -24,6 +24,8 @@ type scheduler struct {
tsoAllocator tso.Allocator
taskChan chan taskV2
lock sync.Mutex
}
func newScheduler(ctx context.Context, idAllocator allocator.GIDAllocator, tsoAllocator tso.Allocator) *scheduler {
@ -49,6 +51,15 @@ func (s *scheduler) Stop() {
s.wg.Wait()
}
func (s *scheduler) execute(task taskV2) {
if err := task.Prepare(task.GetCtx()); err != nil {
task.NotifyDone(err)
return
}
err := task.Execute(task.GetCtx())
task.NotifyDone(err)
}
func (s *scheduler) taskLoop() {
defer s.wg.Done()
for {
@ -56,12 +67,7 @@ func (s *scheduler) taskLoop() {
case <-s.ctx.Done():
return
case task := <-s.taskChan:
if err := task.Prepare(task.GetCtx()); err != nil {
task.NotifyDone(err)
continue
}
err := task.Execute(task.GetCtx())
task.NotifyDone(err)
s.execute(task)
}
}
}
@ -89,6 +95,10 @@ func (s *scheduler) enqueue(task taskV2) {
}
func (s *scheduler) AddTask(task taskV2) error {
// make sure that setting ts and enqueue is atomic.
s.lock.Lock()
defer s.lock.Unlock()
if err := s.setID(task); err != nil {
return err
}

View File

@ -3,8 +3,12 @@ package rootcoord
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"
"go.uber.org/atomic"
"github.com/stretchr/testify/assert"
)
@ -103,7 +107,7 @@ func Test_scheduler_failed_to_set_ts(t *testing.T) {
assert.Error(t, err)
}
func Test_scheduler_enqueu_normal_case(t *testing.T) {
func Test_scheduler_enqueue_normal_case(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
idAlloc.AllocOneF = func() (UniqueID, error) {
@ -166,3 +170,55 @@ func Test_scheduler_bg(t *testing.T) {
s.Stop()
}
func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
tso := atomic.NewUint64(100)
idAlloc.AllocOneF = func() (UniqueID, error) {
return 100, nil
}
tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) {
got := tso.Inc()
return got, nil
}
ctx := context.Background()
s := newScheduler(ctx, idAlloc, tsoAlloc)
Params.InitOnce()
Params.ProxyCfg.TimeTickInterval = time.Millisecond
s.Start()
time.Sleep(time.Millisecond * 4)
// add task to queue.
n := 10
for i := 0; i < n; i++ {
task := newMockNormalTask()
err := s.AddTask(task)
assert.NoError(t, err)
}
time.Sleep(time.Millisecond * 4)
s.Stop()
})
t.Run("invalid tso", func(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
idAlloc.AllocOneF = func() (UniqueID, error) {
return 100, nil
}
tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) {
return 0, fmt.Errorf("error mock GenerateTSO")
}
ctx := context.Background()
s := newScheduler(ctx, idAlloc, tsoAlloc)
Params.InitOnce()
Params.ProxyCfg.TimeTickInterval = time.Millisecond
s.Start()
time.Sleep(time.Millisecond * 4)
s.Stop()
})
}

View File

@ -2,139 +2,285 @@ package rootcoord
import (
"context"
"fmt"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/metastore/model"
)
type Step interface {
Execute(ctx context.Context) error
type stepPriority int
const (
stepPriorityLow = iota
stepPriorityNormal
stepPriorityImportant
stepPriorityUrgent
)
type nestedStep interface {
Execute(ctx context.Context) ([]nestedStep, error)
Desc() string
Weight() stepPriority
}
type baseStep struct {
core *Core
}
type AddCollectionMetaStep struct {
func (s baseStep) Desc() string {
return ""
}
func (s baseStep) Weight() stepPriority {
return stepPriorityLow
}
type addCollectionMetaStep struct {
baseStep
coll *model.Collection
}
func (s *AddCollectionMetaStep) Execute(ctx context.Context) error {
return s.core.meta.AddCollection(ctx, s.coll)
func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.AddCollection(ctx, s.coll)
return nil, err
}
type DeleteCollectionMetaStep struct {
func (s *addCollectionMetaStep) Desc() string {
return fmt.Sprintf("add collection to meta table, name: %s, id: %d, ts: %d", s.coll.Name, s.coll.CollectionID, s.coll.CreateTime)
}
type deleteCollectionMetaStep struct {
baseStep
collectionID UniqueID
ts Timestamp
}
func (s *DeleteCollectionMetaStep) Execute(ctx context.Context) error {
return s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts)
func (s *deleteCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.RemoveCollection(ctx, s.collectionID, s.ts)
return nil, err
}
type RemoveDmlChannelsStep struct {
func (s *deleteCollectionMetaStep) Desc() string {
return fmt.Sprintf("delete collection from meta table, id: %d, ts: %d", s.collectionID, s.ts)
}
func (s *deleteCollectionMetaStep) Weight() stepPriority {
return stepPriorityNormal
}
type removeDmlChannelsStep struct {
baseStep
pchannels []string
pChannels []string
}
func (s *RemoveDmlChannelsStep) Execute(ctx context.Context) error {
s.core.chanTimeTick.removeDmlChannels(s.pchannels...)
return nil
func (s *removeDmlChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
s.core.chanTimeTick.removeDmlChannels(s.pChannels...)
return nil, nil
}
type WatchChannelsStep struct {
func (s *removeDmlChannelsStep) Desc() string {
// this shouldn't be called.
return fmt.Sprintf("remove dml channels: %v", s.pChannels)
}
func (s *removeDmlChannelsStep) Weight() stepPriority {
// avoid too frequent tt.
return stepPriorityUrgent
}
type watchChannelsStep struct {
baseStep
info *watchInfo
}
func (s *WatchChannelsStep) Execute(ctx context.Context) error {
return s.core.broker.WatchChannels(ctx, s.info)
func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.WatchChannels(ctx, s.info)
return nil, err
}
type UnwatchChannelsStep struct {
func (s *watchChannelsStep) Desc() string {
return fmt.Sprintf("watch channels, ts: %d, collection: %d, partition: %d, vChannels: %v",
s.info.ts, s.info.collectionID, s.info.partitionID, s.info.vChannels)
}
type unwatchChannelsStep struct {
baseStep
collectionID UniqueID
channels collectionChannels
}
func (s *UnwatchChannelsStep) Execute(ctx context.Context) error {
return s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels})
func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels})
return nil, err
}
type ChangeCollectionStateStep struct {
func (s *unwatchChannelsStep) Desc() string {
return fmt.Sprintf("unwatch channels, collection: %d, pChannels: %v, vChannels: %v",
s.collectionID, s.channels.physicalChannels, s.channels.virtualChannels)
}
func (s *unwatchChannelsStep) Weight() stepPriority {
return stepPriorityNormal
}
type changeCollectionStateStep struct {
baseStep
collectionID UniqueID
state pb.CollectionState
ts Timestamp
}
func (s *ChangeCollectionStateStep) Execute(ctx context.Context) error {
return s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts)
func (s *changeCollectionStateStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.ChangeCollectionState(ctx, s.collectionID, s.state, s.ts)
return nil, err
}
type ExpireCacheStep struct {
func (s *changeCollectionStateStep) Desc() string {
return fmt.Sprintf("change collection state, collection: %d, ts: %d, state: %s",
s.collectionID, s.ts, s.state.String())
}
type expireCacheStep struct {
baseStep
collectionNames []string
collectionID UniqueID
ts Timestamp
}
func (s *ExpireCacheStep) Execute(ctx context.Context) error {
return s.core.ExpireMetaCache(ctx, s.collectionNames, s.collectionID, s.ts)
func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.ExpireMetaCache(ctx, s.collectionNames, s.collectionID, s.ts)
return nil, err
}
type DeleteCollectionDataStep struct {
func (s *expireCacheStep) Desc() string {
return fmt.Sprintf("expire cache, collection id: %d, collection names: %s, ts: %d",
s.collectionID, s.collectionNames, s.ts)
}
type deleteCollectionDataStep struct {
baseStep
coll *model.Collection
ts Timestamp
}
func (s *DeleteCollectionDataStep) Execute(ctx context.Context) error {
return s.core.garbageCollector.GcCollectionData(ctx, s.coll, s.ts)
func (s *deleteCollectionDataStep) Execute(ctx context.Context) ([]nestedStep, error) {
ddlTs, err := s.core.garbageCollector.GcCollectionData(ctx, s.coll)
if err != nil {
return nil, err
}
// wait for ts synced.
children := make([]nestedStep, 0, len(s.coll.PhysicalChannelNames))
for _, channel := range s.coll.PhysicalChannelNames {
children = append(children, &waitForTsSyncedStep{
baseStep: baseStep{core: s.core},
ts: ddlTs,
channel: channel,
})
}
return children, nil
}
type DeletePartitionDataStep struct {
func (s *deleteCollectionDataStep) Desc() string {
return fmt.Sprintf("delete collection data, collection: %d", s.coll.CollectionID)
}
func (s *deleteCollectionDataStep) Weight() stepPriority {
return stepPriorityImportant
}
// waitForTsSyncedStep child step of deleteCollectionDataStep.
type waitForTsSyncedStep struct {
baseStep
ts Timestamp
channel string
}
func (s *waitForTsSyncedStep) Execute(ctx context.Context) ([]nestedStep, error) {
syncedTs := s.core.chanTimeTick.getSyncedTimeTick(s.channel)
if syncedTs < s.ts {
return nil, fmt.Errorf("ts not synced yet, channel: %s, synced: %d, want: %d", s.channel, syncedTs, s.ts)
}
return nil, nil
}
func (s *waitForTsSyncedStep) Desc() string {
return fmt.Sprintf("wait for ts synced, channel: %s, want: %d", s.channel, s.ts)
}
func (s *waitForTsSyncedStep) Weight() stepPriority {
return stepPriorityNormal
}
type deletePartitionDataStep struct {
baseStep
pchans []string
partition *model.Partition
ts Timestamp
}
func (s *DeletePartitionDataStep) Execute(ctx context.Context) error {
return s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition, s.ts)
func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, error) {
_, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition)
return nil, err
}
type ReleaseCollectionStep struct {
func (s *deletePartitionDataStep) Desc() string {
return fmt.Sprintf("delete partition data, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID)
}
func (s *deletePartitionDataStep) Weight() stepPriority {
return stepPriorityImportant
}
type releaseCollectionStep struct {
baseStep
collectionID UniqueID
}
func (s *ReleaseCollectionStep) Execute(ctx context.Context) error {
return s.core.broker.ReleaseCollection(ctx, s.collectionID)
func (s *releaseCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.ReleaseCollection(ctx, s.collectionID)
return nil, err
}
type DropIndexStep struct {
func (s *releaseCollectionStep) Desc() string {
return fmt.Sprintf("release collection: %d", s.collectionID)
}
func (s *releaseCollectionStep) Weight() stepPriority {
return stepPriorityUrgent
}
type dropIndexStep struct {
baseStep
collID UniqueID
}
func (s *DropIndexStep) Execute(ctx context.Context) error {
return s.core.broker.DropCollectionIndex(ctx, s.collID)
func (s *dropIndexStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.DropCollectionIndex(ctx, s.collID)
return nil, err
}
type AddPartitionMetaStep struct {
func (s *dropIndexStep) Desc() string {
return fmt.Sprintf("drop collection index: %d", s.collID)
}
func (s *dropIndexStep) Weight() stepPriority {
return stepPriorityNormal
}
type addPartitionMetaStep struct {
baseStep
partition *model.Partition
}
func (s *AddPartitionMetaStep) Execute(ctx context.Context) error {
return s.core.meta.AddPartition(ctx, s.partition)
func (s *addPartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.AddPartition(ctx, s.partition)
return nil, err
}
type ChangePartitionStateStep struct {
func (s *addPartitionMetaStep) Desc() string {
return fmt.Sprintf("add partition to meta table, collection: %d, partition: %d", s.partition.CollectionID, s.partition.PartitionID)
}
type changePartitionStateStep struct {
baseStep
collectionID UniqueID
partitionID UniqueID
@ -142,24 +288,47 @@ type ChangePartitionStateStep struct {
ts Timestamp
}
func (s *ChangePartitionStateStep) Execute(ctx context.Context) error {
return s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts)
func (s *changePartitionStateStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.ChangePartitionState(ctx, s.collectionID, s.partitionID, s.state, s.ts)
return nil, err
}
type RemovePartitionMetaStep struct {
func (s *changePartitionStateStep) Desc() string {
return fmt.Sprintf("change partition step, collection: %d, partition: %d, state: %s, ts: %d",
s.collectionID, s.partitionID, s.state.String(), s.ts)
}
type removePartitionMetaStep struct {
baseStep
collectionID UniqueID
partitionID UniqueID
ts Timestamp
}
func (s *RemovePartitionMetaStep) Execute(ctx context.Context) error {
return s.core.meta.RemovePartition(ctx, s.collectionID, s.partitionID, s.ts)
func (s *removePartitionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.meta.RemovePartition(ctx, s.collectionID, s.partitionID, s.ts)
return nil, err
}
type NullStep struct {
func (s *removePartitionMetaStep) Desc() string {
return fmt.Sprintf("remove partition meta, collection: %d, partition: %d, ts: %d", s.collectionID, s.partitionID, s.ts)
}
func (s *NullStep) Execute(ctx context.Context) error {
return nil
func (s *removePartitionMetaStep) Weight() stepPriority {
return stepPriorityNormal
}
type nullStep struct {
}
func (s *nullStep) Execute(ctx context.Context) ([]nestedStep, error) {
return nil, nil
}
func (s *nullStep) Desc() string {
return ""
}
func (s *nullStep) Weight() stepPriority {
return stepPriorityLow
}

View File

@ -0,0 +1,195 @@
package rootcoord
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)
const (
defaultBgExecutingParallel = 4
defaultBgExecutingInterval = time.Second
)
type StepExecutor interface {
Start()
Stop()
AddSteps(s *stepStack)
}
type stepStack struct {
steps []nestedStep
}
func (s *stepStack) Execute(ctx context.Context) *stepStack {
steps := s.steps
for len(steps) > 0 {
l := len(steps)
todo := steps[l-1]
childSteps, err := todo.Execute(ctx)
if retry.IsUnRecoverable(err) {
log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc()))
return nil
}
if err != nil {
s.steps = nil // let s can be collected.
log.Warn("failed to execute step, wait for reschedule", zap.Error(err), zap.String("step", todo.Desc()))
return &stepStack{steps: steps}
}
// this step is done.
steps = steps[:l-1]
steps = append(steps, childSteps...)
}
// everything is done.
return nil
}
type selectStepPolicy func(map[*stepStack]struct{}) []*stepStack
func randomSelect(parallel int, m map[*stepStack]struct{}) []*stepStack {
if parallel <= 0 {
parallel = defaultBgExecutingParallel
}
res := make([]*stepStack, 0, parallel)
for s := range m {
if len(res) >= parallel {
break
}
res = append(res, s)
}
return res
}
func randomSelectPolicy(parallel int) selectStepPolicy {
return func(m map[*stepStack]struct{}) []*stepStack {
return randomSelect(parallel, m)
}
}
func defaultSelectPolicy() selectStepPolicy {
return randomSelectPolicy(defaultBgExecutingParallel)
}
type bgOpt func(*bgStepExecutor)
func withSelectStepPolicy(policy selectStepPolicy) bgOpt {
return func(bg *bgStepExecutor) {
bg.selector = policy
}
}
func withBgInterval(interval time.Duration) bgOpt {
return func(bg *bgStepExecutor) {
bg.interval = interval
}
}
type bgStepExecutor struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
bufferedSteps map[*stepStack]struct{}
selector selectStepPolicy
mu sync.Mutex
notify chan struct{}
interval time.Duration
}
func newBgStepExecutor(ctx context.Context, opts ...bgOpt) *bgStepExecutor {
ctx1, cancel := context.WithCancel(ctx)
bg := &bgStepExecutor{
ctx: ctx1,
cancel: cancel,
wg: sync.WaitGroup{},
bufferedSteps: make(map[*stepStack]struct{}),
selector: defaultSelectPolicy(),
mu: sync.Mutex{},
notify: make(chan struct{}, 1),
interval: defaultBgExecutingInterval,
}
for _, opt := range opts {
opt(bg)
}
return bg
}
func (bg *bgStepExecutor) Start() {
bg.wg.Add(1)
go bg.scheduleLoop()
}
func (bg *bgStepExecutor) Stop() {
bg.cancel()
bg.wg.Wait()
}
func (bg *bgStepExecutor) AddSteps(s *stepStack) {
bg.mu.Lock()
bg.addStepsInternal(s)
bg.mu.Unlock()
select {
case bg.notify <- struct{}{}:
default:
}
}
func (bg *bgStepExecutor) process(steps []*stepStack) {
wg := sync.WaitGroup{}
for i := range steps {
s := steps[i]
if s == nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
child := s.Execute(bg.ctx)
if child != nil {
bg.AddSteps(child)
}
}()
}
wg.Wait()
}
func (bg *bgStepExecutor) schedule() {
bg.mu.Lock()
selected := bg.selector(bg.bufferedSteps)
for _, s := range selected {
bg.removeStepsInternal(s)
}
bg.mu.Unlock()
bg.process(selected)
}
func (bg *bgStepExecutor) scheduleLoop() {
defer bg.wg.Done()
ticker := time.NewTicker(bg.interval)
defer ticker.Stop()
for {
select {
case <-bg.ctx.Done():
return
case <-bg.notify:
bg.schedule()
case <-ticker.C:
bg.schedule()
}
}
}
func (bg *bgStepExecutor) addStepsInternal(s *stepStack) {
bg.bufferedSteps[s] = struct{}{}
}
func (bg *bgStepExecutor) removeStepsInternal(s *stepStack) {
delete(bg.bufferedSteps, s)
}

View File

@ -0,0 +1,175 @@
package rootcoord
import (
"context"
"errors"
"math/rand"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
type mockChildStep struct {
}
func (m *mockChildStep) Execute(ctx context.Context) ([]nestedStep, error) {
return nil, nil
}
func (m *mockChildStep) Desc() string {
return "mock child step"
}
func (m *mockChildStep) Weight() stepPriority {
return stepPriorityLow
}
func newMockChildStep() *mockChildStep {
return &mockChildStep{}
}
type mockStepWithChild struct {
}
func (m *mockStepWithChild) Execute(ctx context.Context) ([]nestedStep, error) {
return []nestedStep{newMockChildStep()}, nil
}
func (m *mockStepWithChild) Desc() string {
return "mock step with child"
}
func (m *mockStepWithChild) Weight() stepPriority {
return stepPriorityLow
}
func newMockStepWithChild() *mockStepWithChild {
return &mockStepWithChild{}
}
func Test_stepStack_Execute(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
steps := []nestedStep{
newMockStepWithChild(),
newMockChildStep(),
}
s := &stepStack{steps: steps}
unfinished := s.Execute(context.Background())
assert.Nil(t, unfinished)
})
t.Run("error case", func(t *testing.T) {
steps := []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockFailStep(),
newMockNormalStep(),
}
s := &stepStack{steps: steps}
unfinished := s.Execute(context.Background())
assert.Equal(t, 3, len(unfinished.steps))
})
t.Run("Unrecoverable", func(t *testing.T) {
failStep := newMockFailStep()
failStep.err = retry.Unrecoverable(errors.New("error mock Execute"))
steps := []nestedStep{
failStep,
}
s := &stepStack{steps: steps}
unfinished := s.Execute(context.Background())
assert.Nil(t, unfinished)
})
}
func Test_randomSelect(t *testing.T) {
s0 := &stepStack{steps: []nestedStep{}}
s1 := &stepStack{steps: []nestedStep{
newMockNormalStep(),
}}
s2 := &stepStack{steps: []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
}}
s3 := &stepStack{steps: []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockNormalStep(),
}}
s4 := &stepStack{steps: []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockNormalStep(),
newMockNormalStep(),
}}
m := map[*stepStack]struct{}{
s0: {},
s1: {},
s2: {},
s3: {},
s4: {},
}
selected := randomSelect(0, m)
assert.Equal(t, defaultBgExecutingParallel, len(selected))
for _, s := range selected {
_, ok := m[s]
assert.True(t, ok)
}
selected = randomSelect(2, m)
assert.Equal(t, 2, len(selected))
for _, s := range selected {
_, ok := m[s]
assert.True(t, ok)
}
}
func Test_bgStepExecutor_scheduleLoop(t *testing.T) {
bg := newBgStepExecutor(context.Background(),
withSelectStepPolicy(defaultSelectPolicy()),
withBgInterval(time.Millisecond*10))
bg.Start()
n := 20
records := make([]int, 0, n)
steps := make([]*stepStack, 0, n)
for i := 0; i < n; i++ {
var s *stepStack
r := rand.Int() % 3
records = append(records, r)
switch r {
case 0:
s = nil
case 1:
failStep := newMockFailStep()
failStep.err = retry.Unrecoverable(errors.New("error mock Execute"))
s = &stepStack{steps: []nestedStep{
newMockNormalStep(),
failStep,
newMockNormalStep(),
}}
case 2:
s = &stepStack{steps: []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockNormalStep(),
}}
default:
}
steps = append(steps, s)
bg.AddSteps(s)
}
for i, r := range records {
switch r {
case 0:
assert.Nil(t, steps[i])
case 1:
<-steps[i].steps[1].(*mockFailStep).calledChan
assert.True(t, steps[i].steps[1].(*mockFailStep).called)
case 2:
default:
}
}
bg.Stop()
}

View File

@ -42,7 +42,32 @@ var (
ttCheckerWarnMsg = fmt.Sprintf("RootCoord haven't synchronized the time tick for %f minutes", timeTickSyncTtInterval.Minutes())
)
// TODO: better to accept ctx for timetickSync-related method, which can trace the ddl.
type ttHistogram struct {
sync.Map
}
func newTtHistogram() *ttHistogram {
return &ttHistogram{}
}
func (h *ttHistogram) update(channel string, ts Timestamp) {
h.Store(channel, ts)
}
func (h *ttHistogram) get(channel string) Timestamp {
ts, ok := h.Load(channel)
if !ok {
return typeutil.ZeroTimestamp
}
return ts.(Timestamp)
}
func (h *ttHistogram) remove(channels ...string) {
for _, channel := range channels {
h.Delete(channel)
}
}
type timetickSync struct {
ctx context.Context
sourceID typeutil.UniqueID
@ -52,6 +77,8 @@ type timetickSync struct {
lock sync.Mutex
sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg
sendChan chan map[typeutil.UniqueID]*chanTsMsg
syncedTtHistogram *ttHistogram
}
type chanTsMsg struct {
@ -98,6 +125,8 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
lock: sync.Mutex{},
sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg),
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
syncedTtHistogram: newTtHistogram(),
}
}
@ -247,6 +276,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil {
log.Warn("SendTimeTickToChannel fail", zap.Error(err))
}
t.syncedTtHistogram.update(chanName, mints)
wg.Done()
}(chanName, ts)
}
@ -326,6 +356,7 @@ func (t *timetickSync) addDmlChannels(names ...string) {
// RemoveDmlChannels remove dml channels
func (t *timetickSync) removeDmlChannels(names ...string) {
t.dmlChannels.removeChannels(names...)
// t.syncedTtHistogram.remove(names...) // channel ts shouldn't go back.
log.Info("remove dml channels", zap.Strings("channels", names))
}
@ -339,6 +370,10 @@ func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstr
return t.dmlChannels.broadcastMark(chanNames, pack)
}
func (t *timetickSync) getSyncedTimeTick(channel string) Timestamp {
return t.syncedTtHistogram.get(channel)
}
func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {
var ret typeutil.Timestamp
for _, t := range tt {

View File

@ -21,6 +21,8 @@ import (
"sync"
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/api/commonpb"
@ -104,3 +106,15 @@ func TestTimetickSync(t *testing.T) {
})
wg.Wait()
}
func Test_ttHistogram_get(t *testing.T) {
h := newTtHistogram()
assert.Equal(t, typeutil.ZeroTimestamp, h.get("not_exist"))
h.update("ch1", 100)
assert.Equal(t, Timestamp(100), h.get("ch1"))
h.update("ch2", 1000)
assert.Equal(t, Timestamp(1000), h.get("ch2"))
h.remove("ch1", "ch2", "not_exist")
assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch1"))
assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch2"))
}

View File

@ -3,55 +3,42 @@ package rootcoord
import (
"context"
"fmt"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type baseUndoTask struct {
todoStep []Step // steps to execute
undoStep []Step // steps to undo
todoStep []nestedStep // steps to execute
undoStep []nestedStep // steps to undo
stepExecutor StepExecutor
}
func newBaseUndoTask() *baseUndoTask {
func newBaseUndoTask(stepExecutor StepExecutor) *baseUndoTask {
return &baseUndoTask{
todoStep: make([]Step, 0),
undoStep: make([]Step, 0),
todoStep: make([]nestedStep, 0),
undoStep: make([]nestedStep, 0),
stepExecutor: stepExecutor,
}
}
func (b *baseUndoTask) AddStep(todoStep, undoStep Step) {
func (b *baseUndoTask) AddStep(todoStep, undoStep nestedStep) {
b.todoStep = append(b.todoStep, todoStep)
b.undoStep = append(b.undoStep, undoStep)
}
func (b *baseUndoTask) undoFromLastFinished(lastFinished int) {
// You cannot just use the ctx of task, since it will be canceled after response is returned.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for i := lastFinished; i >= 0; i-- {
undo := b.undoStep[i]
if err := undo.Execute(ctx); err != nil {
// You depend on the collection meta to do other gc.
// TODO: add ddl logger after other service can be idempotent enough, then you can do separate steps
// independently.
log.Error("failed to execute step, garbage may be generated", zap.Error(err))
return
}
}
}
func (b *baseUndoTask) Execute(ctx context.Context) error {
if len(b.todoStep) != len(b.undoStep) {
return fmt.Errorf("todo step and undo step length not equal")
}
for i := 0; i < len(b.todoStep); i++ {
todoStep := b.todoStep[i]
err := todoStep.Execute(ctx)
if err != nil {
go b.undoFromLastFinished(i - 1)
log.Warn("failed to execute step, trying to undo", zap.Error(err))
// no children step in normal case.
if _, err := todoStep.Execute(ctx); err != nil {
log.Warn("failed to execute step, trying to undo", zap.Error(err), zap.String("desc", todoStep.Desc()))
undoSteps := b.undoStep[:i]
b.undoStep = nil // let baseUndoTask can be collected.
go b.stepExecutor.AddSteps(&stepStack{undoSteps})
return err
}
}

View File

@ -7,18 +7,28 @@ import (
"github.com/stretchr/testify/assert"
)
func newTestUndoTask() *baseUndoTask {
stepExecutor := newMockStepExecutor()
stepExecutor.AddStepsFunc = func(s *stepStack) {
// no schedule, execute directly.
s.Execute(context.Background())
}
undoTask := newBaseUndoTask(stepExecutor)
return undoTask
}
func Test_baseUndoTask_Execute(t *testing.T) {
t.Run("should not happen", func(t *testing.T) {
undoTask := newBaseUndoTask()
undoTask := newTestUndoTask()
undoTask.todoStep = append(undoTask.todoStep, newMockNormalStep())
err := undoTask.Execute(context.Background())
assert.Error(t, err)
})
t.Run("normal case, no undo step will be called", func(t *testing.T) {
undoTask := newBaseUndoTask()
undoTask := newTestUndoTask()
n := 10
todoSteps, undoSteps := make([]Step, 0, n), make([]Step, 0, n)
todoSteps, undoSteps := make([]nestedStep, 0, n), make([]nestedStep, 0, n)
for i := 0; i < n; i++ {
normalTodoStep := newMockNormalStep()
normalUndoStep := newMockNormalStep()
@ -37,13 +47,13 @@ func Test_baseUndoTask_Execute(t *testing.T) {
})
t.Run("partial error, undo from last finished", func(t *testing.T) {
undoTask := newBaseUndoTask()
todoSteps := []Step{
undoTask := newTestUndoTask()
todoSteps := []nestedStep{
newMockNormalStep(),
newMockFailStep(),
newMockNormalStep(),
}
undoSteps := []Step{
undoSteps := []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockNormalStep(),
@ -65,13 +75,13 @@ func Test_baseUndoTask_Execute(t *testing.T) {
})
t.Run("partial error, undo meet error also", func(t *testing.T) {
undoTask := newBaseUndoTask()
todoSteps := []Step{
undoTask := newTestUndoTask()
todoSteps := []nestedStep{
newMockNormalStep(),
newMockNormalStep(),
newMockFailStep(),
}
undoSteps := []Step{
undoSteps := []nestedStep{
newMockNormalStep(),
newMockFailStep(),
newMockNormalStep(),