From b62cb82ebe1bea525b0ebbf8749e1ebf3faa9db7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 18 Aug 2022 14:24:50 +0800 Subject: [PATCH] Simplify Dml-DeltaChannel mapping logic (#18702) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/rootcoord/dml_channels.go | 25 ++++++++++---- internal/rootcoord/dml_channels_test.go | 23 ++++--------- internal/rootcoord/root_coord_test.go | 19 ++++------- internal/rootcoord/task.go | 40 ++++++---------------- internal/rootcoord/timeticksync.go | 45 ++++--------------------- 5 files changed, 49 insertions(+), 103 deletions(-) diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 37e5a00edf..f22c673803 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -179,14 +179,27 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref return d } -func (d *dmlChannels) getChannelName() string { +func (d *dmlChannels) getChannelNames(count int) []string { + if count > len(d.channelsHeap) { + return nil + } d.mut.Lock() defer d.mut.Unlock() - // get first item from heap - item := d.channelsHeap[0] - item.BookUsage() - heap.Fix(&d.channelsHeap, 0) - return genChannelName(d.namePrefix, item.idx) + // get next count items from heap + items := make([]*dmlMsgStream, 0, count) + result := make([]string, 0, count) + for i := 0; i < count; i++ { + item := heap.Pop(&d.channelsHeap).(*dmlMsgStream) + item.BookUsage() + items = append(items, item) + result = append(result, genChannelName(d.namePrefix, item.idx)) + } + + for _, item := range items { + heap.Push(&d.channelsHeap, item) + } + + return result } func (d *dmlChannels) listChannels() []string { diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 65a05f8c72..679385c135 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -141,27 +141,18 @@ func TestDmlChannels(t *testing.T) { assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) }) assert.Panics(t, func() { dml.removeChannels(randStr) }) - // dml_xxx_0 => {chanName0, chanName2} - // dml_xxx_1 => {chanName1} - chanName0 := dml.getChannelName() - dml.addChannels(chanName0) - assert.Equal(t, 1, dml.getChannelNum()) - - chanName1 := dml.getChannelName() - dml.addChannels(chanName1) + chans0 := dml.getChannelNames(2) + dml.addChannels(chans0...) assert.Equal(t, 2, dml.getChannelNum()) - chanName2 := dml.getChannelName() - dml.addChannels(chanName2) + chans1 := dml.getChannelNames(1) + dml.addChannels(chans1...) assert.Equal(t, 2, dml.getChannelNum()) - dml.removeChannels(chanName0) + dml.removeChannels(chans1...) assert.Equal(t, 2, dml.getChannelNum()) - dml.removeChannels(chanName1) - assert.Equal(t, 1, dml.getChannelNum()) - - dml.removeChannels(chanName0) + dml.removeChannels(chans0...) assert.Equal(t, 0, dml.getChannelNum()) } @@ -179,7 +170,7 @@ func TestDmChannelsFailure(t *testing.T) { defer wg.Done() mockFactory := &FailMessageStreamFactory{errBroadcast: true} dml := newDmlChannels(context.TODO(), mockFactory, "test-newdmlchannel-root", 1) - chanName0 := dml.getChannelName() + chanName0 := dml.getChannelNames(1)[0] dml.addChannels(chanName0) require.Equal(t, 1, dml.getChannelNum()) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 371b35a3f5..4d2a3e8c47 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -534,10 +534,9 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 } vchanNames := make([]string, t.ShardsNum) - chanNames := make([]string, t.ShardsNum) + chanNames := core.chanTimeTick.getDmlChannelNames(int(t.ShardsNum)) for i := int32(0); i < t.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.chanTimeTick.getDmlChannelName(), collID, i) - chanNames[i] = funcutil.ToPhysicalChannel(vchanNames[i]) + vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], collID, i) } collInfo := model.Collection{ @@ -2271,15 +2270,11 @@ func TestRootCoord_Base(t *testing.T) { assert.NoError(t, err) time.Sleep(100 * time.Millisecond) - cn0 := core.chanTimeTick.getDmlChannelName() - cn1 := core.chanTimeTick.getDmlChannelName() - cn2 := core.chanTimeTick.getDmlChannelName() - core.chanTimeTick.addDmlChannels(cn0, cn1, cn2) - - dn0 := core.chanTimeTick.getDeltaChannelName() - dn1 := core.chanTimeTick.getDeltaChannelName() - dn2 := core.chanTimeTick.getDeltaChannelName() - core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2) + cns := core.chanTimeTick.getDmlChannelNames(3) + cn0 := cns[0] + cn1 := cns[1] + cn2 := cns[2] + core.chanTimeTick.addDmlChannels(cns...) // wait for local channel reported for { diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index ab2a1dde3a..b568c49d69 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -156,26 +156,18 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { zap.Int64("default partition id", partID)) vchanNames := make([]string, t.Req.ShardsNum) - chanNames := make([]string, t.Req.ShardsNum) deltaChanNames := make([]string, t.Req.ShardsNum) - for i := int32(0); i < t.Req.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.chanTimeTick.getDmlChannelName(), collID, i) - chanNames[i] = funcutil.ToPhysicalChannel(vchanNames[i]) - deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName() - deltaChanName, err1 := funcutil.ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) - if err1 != nil || deltaChanName != deltaChanNames[i] { - err1Msg := "" - if err1 != nil { - err1Msg = err1.Error() - } - log.Debug("dmlChanName deltaChanName mismatch detail", zap.Int32("i", i), - zap.String("vchanName", vchanNames[i]), - zap.String("phsicalChanName", chanNames[i]), - zap.String("deltaChanName", deltaChanNames[i]), - zap.String("converted_deltaChanName", deltaChanName), - zap.String("err", err1Msg)) - return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i]) + //physical channel names + chanNames := t.core.chanTimeTick.getDmlChannelNames(int(t.Req.ShardsNum)) + for i := int32(0); i < t.Req.ShardsNum; i++ { + vchanNames[i] = fmt.Sprintf("%s_%dv%d", chanNames[i], collID, i) + deltaChanNames[i], err = funcutil.ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) + if err != nil { + log.Warn("failed to generate delta channel name", + zap.String("dmlChannelName", chanNames[i]), + zap.Error(err)) + return fmt.Errorf("failed to generate delta channel name from %s, %w", chanNames[i], err) } } @@ -247,9 +239,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { // add dml channel before send dd msg t.core.chanTimeTick.addDmlChannels(chanNames...) - // also add delta channels - t.core.chanTimeTick.addDeltaChannels(deltaChanNames...) - ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames) if err != nil { return fmt.Errorf("send dd create collection req failed, error = %w", err) @@ -264,7 +253,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { // update meta table after send dd operation if err = t.core.MetaTable.AddCollection(&collInfo, ts, ddOpStr); err != nil { t.core.chanTimeTick.removeDmlChannels(chanNames...) - t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...) // it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic return fmt.Errorf("meta table add collection failed,error = %w", err) } @@ -390,14 +378,6 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // remove dml channel after send dd msg t.core.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...) - // remove delta channels - deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames)) - for i, chanName := range collMeta.PhysicalChannelNames { - if deltaChanNames[i], err = funcutil.ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta); err != nil { - return err - } - } - t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...) return nil } diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index f3e438afb8..9fd206095b 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -48,8 +47,7 @@ type timetickSync struct { ctx context.Context sourceID typeutil.UniqueID - dmlChannels *dmlChannels // used for insert - deltaChannels *dmlChannels // used for delete + dmlChannels *dmlChannels // used for insert lock sync.Mutex sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg @@ -89,33 +87,18 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { // initialize dml channels used for insert dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum) - // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels - deltaChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum) // recover physical channels for all collections for collID, chanNames := range chanMap { dmlChannels.addChannels(chanNames...) - log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames)) - - var err error - deltaChanNames := make([]string, len(chanNames)) - for i, chanName := range chanNames { - deltaChanNames[i], err = funcutil.ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) - if err != nil { - log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName)) - panic("invalid dml channel name " + chanName) - } - } - deltaChannels.addChannels(deltaChanNames...) - log.Debug("recover delta channels", zap.Int64("collID", collID), zap.Any("delta channels", deltaChanNames)) + log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Strings("physical channels", chanNames)) } return &timetickSync{ ctx: ctx, sourceID: sourceID, - dmlChannels: dmlChannels, - deltaChannels: deltaChannels, + dmlChannels: dmlChannels, lock: sync.Mutex{}, sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg), @@ -384,9 +367,9 @@ func (t *timetickSync) getSessionNum() int { } /////////////////////////////////////////////////////////////////////////////// -// GetDmlChannelName return a valid dml channel name -func (t *timetickSync) getDmlChannelName() string { - return t.dmlChannels.getChannelName() +// getDmlChannelNames returns list of channel names. +func (t *timetickSync) getDmlChannelNames(count int) []string { + return t.dmlChannels.getChannelNames(count) } // GetDmlChannelNum return the num of dml channels @@ -419,22 +402,6 @@ func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstr return t.dmlChannels.broadcastMark(chanNames, pack) } -/////////////////////////////////////////////////////////////////////////////// -// GetDeltaChannelName return a valid delta channel name -func (t *timetickSync) getDeltaChannelName() string { - return t.deltaChannels.getChannelName() -} - -// AddDeltaChannels add delta channels -func (t *timetickSync) addDeltaChannels(names ...string) { - t.deltaChannels.addChannels(names...) -} - -// RemoveDeltaChannels remove delta channels -func (t *timetickSync) removeDeltaChannels(names ...string) { - t.deltaChannels.removeChannels(names...) -} - func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { var ret typeutil.Timestamp for _, t := range tt {