Simplify Dml-DeltaChannel mapping logic (#18702)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-08-18 14:24:50 +08:00 committed by GitHub
parent 6ae6b6a922
commit b62cb82ebe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 103 deletions

View File

@ -179,14 +179,27 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
return d return d
} }
func (d *dmlChannels) getChannelName() string { func (d *dmlChannels) getChannelNames(count int) []string {
if count > len(d.channelsHeap) {
return nil
}
d.mut.Lock() d.mut.Lock()
defer d.mut.Unlock() defer d.mut.Unlock()
// get first item from heap // get next count items from heap
item := d.channelsHeap[0] items := make([]*dmlMsgStream, 0, count)
item.BookUsage() result := make([]string, 0, count)
heap.Fix(&d.channelsHeap, 0) for i := 0; i < count; i++ {
return genChannelName(d.namePrefix, item.idx) item := heap.Pop(&d.channelsHeap).(*dmlMsgStream)
item.BookUsage()
items = append(items, item)
result = append(result, genChannelName(d.namePrefix, item.idx))
}
for _, item := range items {
heap.Push(&d.channelsHeap, item)
}
return result
} }
func (d *dmlChannels) listChannels() []string { func (d *dmlChannels) listChannels() []string {

View File

@ -141,27 +141,18 @@ func TestDmlChannels(t *testing.T) {
assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) }) assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) })
assert.Panics(t, func() { dml.removeChannels(randStr) }) assert.Panics(t, func() { dml.removeChannels(randStr) })
// dml_xxx_0 => {chanName0, chanName2} chans0 := dml.getChannelNames(2)
// dml_xxx_1 => {chanName1} dml.addChannels(chans0...)
chanName0 := dml.getChannelName()
dml.addChannels(chanName0)
assert.Equal(t, 1, dml.getChannelNum())
chanName1 := dml.getChannelName()
dml.addChannels(chanName1)
assert.Equal(t, 2, dml.getChannelNum()) assert.Equal(t, 2, dml.getChannelNum())
chanName2 := dml.getChannelName() chans1 := dml.getChannelNames(1)
dml.addChannels(chanName2) dml.addChannels(chans1...)
assert.Equal(t, 2, dml.getChannelNum()) assert.Equal(t, 2, dml.getChannelNum())
dml.removeChannels(chanName0) dml.removeChannels(chans1...)
assert.Equal(t, 2, dml.getChannelNum()) assert.Equal(t, 2, dml.getChannelNum())
dml.removeChannels(chanName1) dml.removeChannels(chans0...)
assert.Equal(t, 1, dml.getChannelNum())
dml.removeChannels(chanName0)
assert.Equal(t, 0, dml.getChannelNum()) assert.Equal(t, 0, dml.getChannelNum())
} }
@ -179,7 +170,7 @@ func TestDmChannelsFailure(t *testing.T) {
defer wg.Done() defer wg.Done()
mockFactory := &FailMessageStreamFactory{errBroadcast: true} mockFactory := &FailMessageStreamFactory{errBroadcast: true}
dml := newDmlChannels(context.TODO(), mockFactory, "test-newdmlchannel-root", 1) dml := newDmlChannels(context.TODO(), mockFactory, "test-newdmlchannel-root", 1)
chanName0 := dml.getChannelName() chanName0 := dml.getChannelNames(1)[0]
dml.addChannels(chanName0) dml.addChannels(chanName0)
require.Equal(t, 1, dml.getChannelNum()) require.Equal(t, 1, dml.getChannelNum())

View File

@ -534,10 +534,9 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
} }
vchanNames := make([]string, t.ShardsNum) vchanNames := make([]string, t.ShardsNum)
chanNames := make([]string, t.ShardsNum) chanNames := core.chanTimeTick.getDmlChannelNames(int(t.ShardsNum))
for i := int32(0); i < t.ShardsNum; i++ { for i := int32(0); i < t.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.chanTimeTick.getDmlChannelName(), collID, i) vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], collID, i)
chanNames[i] = funcutil.ToPhysicalChannel(vchanNames[i])
} }
collInfo := model.Collection{ collInfo := model.Collection{
@ -2271,15 +2270,11 @@ func TestRootCoord_Base(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
cn0 := core.chanTimeTick.getDmlChannelName() cns := core.chanTimeTick.getDmlChannelNames(3)
cn1 := core.chanTimeTick.getDmlChannelName() cn0 := cns[0]
cn2 := core.chanTimeTick.getDmlChannelName() cn1 := cns[1]
core.chanTimeTick.addDmlChannels(cn0, cn1, cn2) cn2 := cns[2]
core.chanTimeTick.addDmlChannels(cns...)
dn0 := core.chanTimeTick.getDeltaChannelName()
dn1 := core.chanTimeTick.getDeltaChannelName()
dn2 := core.chanTimeTick.getDeltaChannelName()
core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2)
// wait for local channel reported // wait for local channel reported
for { for {

View File

@ -156,26 +156,18 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
zap.Int64("default partition id", partID)) zap.Int64("default partition id", partID))
vchanNames := make([]string, t.Req.ShardsNum) vchanNames := make([]string, t.Req.ShardsNum)
chanNames := make([]string, t.Req.ShardsNum)
deltaChanNames := make([]string, t.Req.ShardsNum) deltaChanNames := make([]string, t.Req.ShardsNum)
for i := int32(0); i < t.Req.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.chanTimeTick.getDmlChannelName(), collID, i)
chanNames[i] = funcutil.ToPhysicalChannel(vchanNames[i])
deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName() //physical channel names
deltaChanName, err1 := funcutil.ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) chanNames := t.core.chanTimeTick.getDmlChannelNames(int(t.Req.ShardsNum))
if err1 != nil || deltaChanName != deltaChanNames[i] { for i := int32(0); i < t.Req.ShardsNum; i++ {
err1Msg := "" vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], collID, i)
if err1 != nil { deltaChanNames[i], err = funcutil.ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
err1Msg = err1.Error() if err != nil {
} log.Warn("failed to generate delta channel name",
log.Debug("dmlChanName deltaChanName mismatch detail", zap.Int32("i", i), zap.String("dmlChannelName", chanNames[i]),
zap.String("vchanName", vchanNames[i]), zap.Error(err))
zap.String("phsicalChanName", chanNames[i]), return fmt.Errorf("failed to generate delta channel name from %s, %w", chanNames[i], err)
zap.String("deltaChanName", deltaChanNames[i]),
zap.String("converted_deltaChanName", deltaChanName),
zap.String("err", err1Msg))
return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i])
} }
} }
@ -247,9 +239,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
// add dml channel before send dd msg // add dml channel before send dd msg
t.core.chanTimeTick.addDmlChannels(chanNames...) t.core.chanTimeTick.addDmlChannels(chanNames...)
// also add delta channels
t.core.chanTimeTick.addDeltaChannels(deltaChanNames...)
ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames) ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil { if err != nil {
return fmt.Errorf("send dd create collection req failed, error = %w", err) return fmt.Errorf("send dd create collection req failed, error = %w", err)
@ -264,7 +253,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
// update meta table after send dd operation // update meta table after send dd operation
if err = t.core.MetaTable.AddCollection(&collInfo, ts, ddOpStr); err != nil { if err = t.core.MetaTable.AddCollection(&collInfo, ts, ddOpStr); err != nil {
t.core.chanTimeTick.removeDmlChannels(chanNames...) t.core.chanTimeTick.removeDmlChannels(chanNames...)
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic // it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
return fmt.Errorf("meta table add collection failed,error = %w", err) return fmt.Errorf("meta table add collection failed,error = %w", err)
} }
@ -390,14 +378,6 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// remove dml channel after send dd msg // remove dml channel after send dd msg
t.core.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...) t.core.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...)
// remove delta channels
deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames))
for i, chanName := range collMeta.PhysicalChannelNames {
if deltaChanNames[i], err = funcutil.ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta); err != nil {
return err
}
}
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
return nil return nil
} }

View File

@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/tsoutil"
@ -48,8 +47,7 @@ type timetickSync struct {
ctx context.Context ctx context.Context
sourceID typeutil.UniqueID sourceID typeutil.UniqueID
dmlChannels *dmlChannels // used for insert dmlChannels *dmlChannels // used for insert
deltaChannels *dmlChannels // used for delete
lock sync.Mutex lock sync.Mutex
sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg
@ -89,33 +87,18 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// initialize dml channels used for insert // initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum) dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
deltaChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum)
// recover physical channels for all collections // recover physical channels for all collections
for collID, chanNames := range chanMap { for collID, chanNames := range chanMap {
dmlChannels.addChannels(chanNames...) dmlChannels.addChannels(chanNames...)
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames)) log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Strings("physical channels", chanNames))
var err error
deltaChanNames := make([]string, len(chanNames))
for i, chanName := range chanNames {
deltaChanNames[i], err = funcutil.ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
panic("invalid dml channel name " + chanName)
}
}
deltaChannels.addChannels(deltaChanNames...)
log.Debug("recover delta channels", zap.Int64("collID", collID), zap.Any("delta channels", deltaChanNames))
} }
return &timetickSync{ return &timetickSync{
ctx: ctx, ctx: ctx,
sourceID: sourceID, sourceID: sourceID,
dmlChannels: dmlChannels, dmlChannels: dmlChannels,
deltaChannels: deltaChannels,
lock: sync.Mutex{}, lock: sync.Mutex{},
sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg), sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg),
@ -384,9 +367,9 @@ func (t *timetickSync) getSessionNum() int {
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// GetDmlChannelName return a valid dml channel name // getDmlChannelNames returns list of channel names.
func (t *timetickSync) getDmlChannelName() string { func (t *timetickSync) getDmlChannelNames(count int) []string {
return t.dmlChannels.getChannelName() return t.dmlChannels.getChannelNames(count)
} }
// GetDmlChannelNum return the num of dml channels // GetDmlChannelNum return the num of dml channels
@ -419,22 +402,6 @@ func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstr
return t.dmlChannels.broadcastMark(chanNames, pack) return t.dmlChannels.broadcastMark(chanNames, pack)
} }
///////////////////////////////////////////////////////////////////////////////
// GetDeltaChannelName return a valid delta channel name
func (t *timetickSync) getDeltaChannelName() string {
return t.deltaChannels.getChannelName()
}
// AddDeltaChannels add delta channels
func (t *timetickSync) addDeltaChannels(names ...string) {
t.deltaChannels.addChannels(names...)
}
// RemoveDeltaChannels remove delta channels
func (t *timetickSync) removeDeltaChannels(names ...string) {
t.deltaChannels.removeChannels(names...)
}
func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {
var ret typeutil.Timestamp var ret typeutil.Timestamp
for _, t := range tt { for _, t := range tt {