diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 3f5cc37d0b..fabbf9298b 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -12,6 +12,7 @@ package rootcoord import ( + "context" "fmt" "sync" @@ -29,16 +30,18 @@ type dmlMsgStream struct { } type dmlChannels struct { - core *Core + ctx context.Context + factory msgstream.Factory namePrefix string capacity int64 idx *atomic.Int64 pool sync.Map } -func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels { +func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefix string, chanNum int64) *dmlChannels { d := &dmlChannels{ - core: c, + ctx: ctx, + factory: factory, namePrefix: chanNamePrefix, capacity: chanNum, idx: atomic.NewInt64(0), @@ -46,8 +49,8 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels } for i := int64(0); i < chanNum; i++ { - name := getDmlChannelName(d.namePrefix, i) - ms, err := c.msFactory.NewMsgStream(c.ctx) + name := genChannelName(d.namePrefix, i) + ms, err := factory.NewMsgStream(ctx) if err != nil { log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err)) panic("Failed to add msgstream") @@ -62,13 +65,12 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels return d } -func (d *dmlChannels) GetDmlMsgStreamName() string { +func (d *dmlChannels) getChannelName() string { cnt := d.idx.Inc() - return getDmlChannelName(d.namePrefix, (cnt-1)%d.capacity) + return genChannelName(d.namePrefix, (cnt-1)%d.capacity) } -// ListPhysicalChannels lists all dml channel names -func (d *dmlChannels) ListPhysicalChannels() []string { +func (d *dmlChannels) listChannels() []string { var chanNames []string d.pool.Range( func(k, v interface{}) bool { @@ -83,13 +85,11 @@ func (d *dmlChannels) ListPhysicalChannels() []string { return chanNames } -// GetNumChannels get current dml channel count -func (d *dmlChannels) GetPhysicalChannelNum() int { - return len(d.ListPhysicalChannels()) +func (d *dmlChannels) getChannelNum() int { + return len(d.listChannels()) } -// Broadcast broadcasts msg pack into specified channel -func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error { +func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error { for _, chanName := range chanNames { v, ok := d.pool.Load(chanName) if !ok { @@ -110,8 +110,7 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err return nil } -// BroadcastMark broadcasts msg pack into specified channel and returns related message id -func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) { +func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) { result := make(map[string][]byte) for _, chanName := range chanNames { v, ok := d.pool.Load(chanName) @@ -140,8 +139,7 @@ func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack) return result, nil } -// AddProducerChannels add named channels as producer -func (d *dmlChannels) AddProducerChannels(names ...string) { +func (d *dmlChannels) addChannels(names ...string) { for _, name := range names { v, ok := d.pool.Load(name) if !ok { @@ -159,8 +157,7 @@ func (d *dmlChannels) AddProducerChannels(names ...string) { } } -// RemoveProducerChannels removes specified channels -func (d *dmlChannels) RemoveProducerChannels(names ...string) { +func (d *dmlChannels) removeChannels(names ...string) { for _, name := range names { v, ok := d.pool.Load(name) if !ok { @@ -180,6 +177,6 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) { } } -func getDmlChannelName(prefix string, idx int64) string { +func genChannelName(prefix string, idx int64) string { return fmt.Sprintf("%s_%d", prefix, idx) } diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 4f1e5a946b..a5a64510ac 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -38,39 +38,36 @@ func TestDmlChannels(t *testing.T) { err := factory.SetParams(m) assert.Nil(t, err) - core, err := NewCore(ctx, factory) - assert.Nil(t, err) - - dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum) - chanNames := dml.ListPhysicalChannels() + dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum) + chanNames := dml.listChannels() assert.Equal(t, 0, len(chanNames)) randStr := funcutil.RandomString(8) - assert.Panics(t, func() { dml.AddProducerChannels(randStr) }) - assert.Panics(t, func() { dml.Broadcast([]string{randStr}, nil) }) - assert.Panics(t, func() { dml.BroadcastMark([]string{randStr}, nil) }) - assert.Panics(t, func() { dml.RemoveProducerChannels(randStr) }) + assert.Panics(t, func() { dml.addChannels(randStr) }) + assert.Panics(t, func() { dml.broadcast([]string{randStr}, nil) }) + assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) }) + assert.Panics(t, func() { dml.removeChannels(randStr) }) // dml_xxx_0 => {chanName0, chanName2} // dml_xxx_1 => {chanName1} - chanName0 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName0) - assert.Equal(t, 1, dml.GetPhysicalChannelNum()) + chanName0 := dml.getChannelName() + dml.addChannels(chanName0) + assert.Equal(t, 1, dml.getChannelNum()) - chanName1 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName1) - assert.Equal(t, 2, dml.GetPhysicalChannelNum()) + chanName1 := dml.getChannelName() + dml.addChannels(chanName1) + assert.Equal(t, 2, dml.getChannelNum()) - chanName2 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName2) - assert.Equal(t, 2, dml.GetPhysicalChannelNum()) + chanName2 := dml.getChannelName() + dml.addChannels(chanName2) + assert.Equal(t, 2, dml.getChannelNum()) - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 2, dml.GetPhysicalChannelNum()) + dml.removeChannels(chanName0) + assert.Equal(t, 2, dml.getChannelNum()) - dml.RemoveProducerChannels(chanName1) - assert.Equal(t, 1, dml.GetPhysicalChannelNum()) + dml.removeChannels(chanName1) + assert.Equal(t, 1, dml.getChannelNum()) - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 0, dml.GetPhysicalChannelNum()) + dml.removeChannels(chanName0) + assert.Equal(t, 0, dml.getChannelNum()) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index e1fea8985c..7f77226548 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -129,12 +129,6 @@ type Core struct { CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error - // dml channels used for insert - dmlChannels *dmlChannels - - // delta channels used for delete - deltaChannels *dmlChannels - //Proxy manager proxyManager *proxyManager @@ -482,8 +476,7 @@ func (c *Core) setMsgStreams() error { } metrics.RootCoordDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t))) - //c.dmlChannels.BroadcastAll(&msgPack) - pc := c.dmlChannels.ListPhysicalChannels() + pc := c.chanTimeTick.listDmlChannels() pt := make([]uint64, len(pc)) for i := 0; i < len(pt); i++ { pt[i] = t @@ -503,7 +496,7 @@ func (c *Core) setMsgStreams() error { // zap.Any("DefaultTs", t), // zap.Any("sourceID", c.session.ServerID), // zap.Any("reason", reason)) - return c.chanTimeTick.UpdateTimeTick(&ttMsg, reason) + return c.chanTimeTick.updateTimeTick(&ttMsg, reason) } c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error) { @@ -519,7 +512,7 @@ func (c *Core) setMsgStreams() error { CreateCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.BroadcastMark(channelNames, &msgPack) + return c.chanTimeTick.broadcastMarkDmlChannels(channelNames, &msgPack) } c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error { @@ -535,7 +528,7 @@ func (c *Core) setMsgStreams() error { DropCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack) } c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error { @@ -551,7 +544,7 @@ func (c *Core) setMsgStreams() error { CreatePartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack) } c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error { @@ -567,7 +560,7 @@ func (c *Core) setMsgStreams() error { DropPartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack) } return nil @@ -1007,46 +1000,23 @@ func (c *Core) Init() error { return } - // initialize dml channels used for insert - c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum) - - // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels - c.deltaChannels = newDmlChannels(c, Params.DeltaChannelName, Params.DmlChannelNum) - - // recover physical channels for all collections chanMap := c.MetaTable.ListCollectionPhysicalChannels() - for collID, chanNames := range chanMap { - c.dmlChannels.AddProducerChannels(chanNames...) - log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames)) - - // TODO: convert physical channel name to delta channel name - for _, chanName := range chanNames { - deltaChanName, err := ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName) - if err != nil { - log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName)) - return - } - c.deltaChannels.AddProducerChannels(deltaChanName) - log.Debug("recover delta channels", zap.Int64("collID", collID), zap.String("deltaChanName", deltaChanName)) - } - } - - c.chanTimeTick = newTimeTickSync(c) - c.chanTimeTick.AddProxy(c.session) + c.chanTimeTick = newTimeTickSync(c.ctx, c.session, c.msFactory, chanMap) + c.chanTimeTick.addProxy(c.session) c.proxyClientManager = newProxyClientManager(c) log.Debug("RootCoord, set proxy manager") c.proxyManager, initError = newProxyManager( c.ctx, Params.EtcdEndpoints, - c.chanTimeTick.GetProxy, + c.chanTimeTick.getProxy, c.proxyClientManager.GetProxyClients, ) if initError != nil { return } - c.proxyManager.AddSession(c.chanTimeTick.AddProxy, c.proxyClientManager.AddProxyClient) - c.proxyManager.DelSession(c.chanTimeTick.DelProxy, c.proxyClientManager.DelProxyClient) + c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient) + c.proxyManager.DelSession(c.chanTimeTick.delProxy, c.proxyClientManager.DelProxyClient) c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() @@ -1197,7 +1167,7 @@ func (c *Core) Start() error { c.wg.Add(4) go c.startTimeTickLoop() go c.tsLoop() - go c.chanTimeTick.StartWatch(&c.wg) + go c.chanTimeTick.startWatch(&c.wg) go c.checkFlushedSegmentsLoop() go c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) @@ -1839,7 +1809,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())] return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil } - err := c.chanTimeTick.UpdateTimeTick(in, "gRPC") + err := c.chanTimeTick.updateTimeTick(in, "gRPC") if err != nil { log.Error("UpdateTimeTick failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index b3def9f28e..2c62a30948 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -327,7 +327,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 vchanNames := make([]string, t.ShardsNum) chanNames := make([]string, t.ShardsNum) for i := int32(0); i < t.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.dmlChannels.GetDmlMsgStreamName(), collID, i) + vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.chanTimeTick.getDmlChannelName(), collID, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -389,9 +389,9 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 core.ddlLock.Lock() defer core.ddlLock.Unlock() - core.chanTimeTick.AddDdlTimeTick(ts, reason) + core.chanTimeTick.addDdlTimeTick(ts, reason) // clear ddl timetick in all conditions - defer core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + defer core.chanTimeTick.removeDdlTimeTick(ts, reason) err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr) if err != nil { @@ -680,7 +680,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - assert.Equal(t, shardsNum, int32(core.dmlChannels.GetPhysicalChannelNum())) + assert.Equal(t, shardsNum, int32(core.chanTimeTick.getDmlChannelNum())) createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) @@ -1713,7 +1713,7 @@ func TestRootCoord(t *testing.T) { ts1 = uint64(120) ts2 = uint64(150) ) - numChan := core.chanTimeTick.GetChanNum() + numChan := core.chanTimeTick.getDmlChannelNum() p1 := sessionutil.Session{ ServerID: 100, } @@ -1735,15 +1735,15 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - cn0 := core.dmlChannels.GetDmlMsgStreamName() - cn1 := core.dmlChannels.GetDmlMsgStreamName() - cn2 := core.dmlChannels.GetDmlMsgStreamName() - core.dmlChannels.AddProducerChannels(cn0, cn1, cn2) + cn0 := core.chanTimeTick.getDmlChannelName() + cn1 := core.chanTimeTick.getDmlChannelName() + cn2 := core.chanTimeTick.getDmlChannelName() + core.chanTimeTick.addDmlChannels(cn0, cn1, cn2) - dn0 := core.deltaChannels.GetDmlMsgStreamName() - dn1 := core.deltaChannels.GetDmlMsgStreamName() - dn2 := core.deltaChannels.GetDmlMsgStreamName() - core.deltaChannels.AddProducerChannels(dn0, dn1, dn2) + dn0 := core.chanTimeTick.getDeltaChannelName() + dn1 := core.chanTimeTick.getDeltaChannelName() + dn2 := core.chanTimeTick.getDeltaChannelName() + core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2) msg0 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ @@ -1783,10 +1783,10 @@ func TestRootCoord(t *testing.T) { time.Sleep(100 * time.Millisecond) // 2 proxy, 1 rootcoord - assert.Equal(t, 3, core.chanTimeTick.GetProxyNum()) + assert.Equal(t, 3, core.chanTimeTick.getProxyNum()) // add 3 proxy channels - assert.Equal(t, 3, core.chanTimeTick.GetChanNum()-numChan) + assert.Equal(t, 3, core.chanTimeTick.getDmlChannelNum()-numChan) _, err = core.etcdCli.Delete(ctx2, proxy1) assert.Nil(t, err) diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index e00f4cc3b0..fa2a59c25c 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -136,10 +136,10 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { chanNames := make([]string, t.Req.ShardsNum) deltaChanNames := make([]string, t.Req.ShardsNum) for i := int32(0); i < t.Req.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.dmlChannels.GetDmlMsgStreamName(), collID, i) + vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.chanTimeTick.getDmlChannelName(), collID, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) - deltaChanNames[i] = t.core.deltaChannels.GetDmlMsgStreamName() + deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName() deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.DmlChannelName, Params.DeltaChannelName) if err1 != nil || deltaChanName != deltaChanNames[i] { return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i]) @@ -200,15 +200,15 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { t.core.ddlLock.Lock() defer t.core.ddlLock.Unlock() - t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + t.core.chanTimeTick.addDdlTimeTick(ts, reason) // clear ddl timetick in all conditions - defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) // add dml channel before send dd msg - t.core.dmlChannels.AddProducerChannels(chanNames...) + t.core.chanTimeTick.addDmlChannels(chanNames...) // also add delta channels - t.core.deltaChannels.AddProducerChannels(deltaChanNames...) + t.core.chanTimeTick.addDeltaChannels(deltaChanNames...) ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames) if err != nil { @@ -222,13 +222,13 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { } err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr) if err != nil { - t.core.dmlChannels.RemoveProducerChannels(chanNames...) - t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...) + t.core.chanTimeTick.removeDmlChannels(chanNames...) + t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...) // it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic return fmt.Errorf("meta table add collection failed,error = %w", err) } - t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) return nil @@ -306,9 +306,9 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.ddlLock.Lock() defer t.core.ddlLock.Unlock() - t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + t.core.chanTimeTick.addDdlTimeTick(ts, reason) // clear ddl timetick in all conditions - defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr) if err != nil { @@ -320,24 +320,23 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return err } - t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) // send tt into deleted channels to tell data_node to clear flowgragh - t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts) + t.core.chanTimeTick.sendTimeTickToChannel(collMeta.PhysicalChannelNames, ts) // remove dml channel after send dd msg - t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) + t.core.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...) // remove delta channels deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames)) for i, chanName := range collMeta.PhysicalChannelNames { - deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName) - if err != nil { + if deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName); err != nil { return err } } - t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...) + t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...) return nil } @@ -522,9 +521,9 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { t.core.ddlLock.Lock() defer t.core.ddlLock.Unlock() - t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + t.core.chanTimeTick.addDdlTimeTick(ts, reason) // clear ddl timetick in all conditions - defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr) if err != nil { @@ -536,7 +535,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return err } - t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) return nil } @@ -607,9 +606,9 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { t.core.ddlLock.Lock() defer t.core.ddlLock.Unlock() - t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + t.core.chanTimeTick.addDdlTimeTick(ts, reason) // clear ddl timetick in all conditions - defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr) if err != nil { @@ -621,7 +620,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return err } - t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) return nil } diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 2fb3e61000..2edaf27f1e 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -12,6 +12,7 @@ package rootcoord import ( + "context" "fmt" "math" "sync" @@ -29,7 +30,12 @@ import ( ) type timetickSync struct { - core *Core + ctx context.Context + session *sessionutil.Session + + dmlChannels *dmlChannels // used for insert + deltaChannels *dmlChannels // used for delete + lock sync.Mutex proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg @@ -64,10 +70,38 @@ func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp return c.in.DefaultTimestamp } -func newTimeTickSync(core *Core) *timetickSync { +func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { + // initialize dml channels used for insert + dmlChannels := newDmlChannels(ctx, factory, Params.DmlChannelName, Params.DmlChannelNum) + // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels + deltaChannels := newDmlChannels(ctx, factory, Params.DeltaChannelName, Params.DmlChannelNum) + + // recover physical channels for all collections + for collID, chanNames := range chanMap { + dmlChannels.addChannels(chanNames...) + log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames)) + + var err error + deltaChanNames := make([]string, len(chanNames)) + for i, chanName := range chanNames { + deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName) + if err != nil { + log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName)) + panic("invalid dml channel name " + chanName) + } + } + deltaChannels.addChannels(deltaChanNames...) + log.Debug("recover delta channels", zap.Int64("collID", collID), zap.Any("delta channels", deltaChanNames)) + } + return &timetickSync{ + ctx: ctx, + session: session, + + dmlChannels: dmlChannels, + deltaChannels: deltaChannels, + lock: sync.Mutex{}, - core: core, proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg), sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16), @@ -99,7 +133,7 @@ func (t *timetickSync) sendToChannel() { // AddDmlTimeTick add ts into ddlTimetickInfos[sourceID], // can be used to tell if DDL operation is in process. -func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) { +func (t *timetickSync) addDdlTimeTick(ts typeutil.Timestamp, reason string) { t.ddlLock.Lock() defer t.ddlLock.Unlock() @@ -114,7 +148,7 @@ func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) { // RemoveDdlTimeTick is invoked in UpdateTimeTick. // It clears the ts generated by AddDdlTimeTick, indicates DDL operation finished. -func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) { +func (t *timetickSync) removeDdlTimeTick(ts typeutil.Timestamp, reason string) { t.ddlLock.Lock() defer t.ddlLock.Unlock() @@ -136,7 +170,7 @@ func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) { } } -func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp { +func (t *timetickSync) getDdlMinTimeTick() typeutil.Timestamp { t.ddlLock.Lock() defer t.ddlLock.Unlock() @@ -144,7 +178,7 @@ func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp { } // UpdateTimeTick check msg validation and send it to local channel -func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error { +func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error { t.lock.Lock() defer t.lock.Unlock() if len(in.ChannelNames) == 0 && in.DefaultTimestamp == 0 { @@ -160,7 +194,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason } // if ddl operation not finished, skip current ts update - ddlMinTs := t.GetDdlMinTimeTick() + ddlMinTs := t.getDdlMinTimeTick() if in.DefaultTimestamp > ddlMinTs { log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), @@ -169,7 +203,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return nil } - if in.Base.SourceID == t.core.session.ServerID { + if in.Base.SourceID == t.session.ServerID { if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp { log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), @@ -193,14 +227,14 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return nil } -func (t *timetickSync) AddProxy(sess *sessionutil.Session) { +func (t *timetickSync) addProxy(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() t.proxyTimeTick[sess.ServerID] = nil log.Debug("Add proxy for timeticksync", zap.Int64("serverID", sess.ServerID)) } -func (t *timetickSync) DelProxy(sess *sessionutil.Session) { +func (t *timetickSync) delProxy(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() if _, ok := t.proxyTimeTick[sess.ServerID]; ok { @@ -210,7 +244,7 @@ func (t *timetickSync) DelProxy(sess *sessionutil.Session) { } } -func (t *timetickSync) GetProxy(sess []*sessionutil.Session) { +func (t *timetickSync) getProxy(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() for _, s := range sess { @@ -219,12 +253,12 @@ func (t *timetickSync) GetProxy(sess []*sessionutil.Session) { } // StartWatch watch proxy node change and process all channels' timetick msg -func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { +func (t *timetickSync) startWatch(wg *sync.WaitGroup) { defer wg.Done() for { select { - case <-t.core.ctx.Done(): - log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err())) + case <-t.ctx.Done(): + log.Debug("rootcoord context done", zap.Error(t.ctx.Err())) return case proxyTimetick, ok := <-t.sendChan: if !ok { @@ -233,7 +267,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { } // reduce each channel to get min timestamp - local := proxyTimetick[t.core.session.ServerID] + local := proxyTimetick[t.session.ServerID] if len(local.in.ChannelNames) == 0 { continue } @@ -251,7 +285,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { mints = ts } } - if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil { + if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil { log.Debug("SendTimeTickToChannel fail", zap.Error(err)) } wg.Done() @@ -270,7 +304,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { } // SendTimeTickToChannel send each channel's min timetick to msg stream -func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { +func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ BeginTimestamp: ts, @@ -282,7 +316,7 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim MsgType: commonpb.MsgType_TimeTick, MsgID: 0, Timestamp: ts, - SourceID: t.core.session.ServerID, + SourceID: t.session.ServerID, }, } timeTickMsg := &msgstream.TimeTickMsg{ @@ -291,7 +325,7 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil { + if err := t.dmlChannels.broadcast(chanNames, &msgPack); err != nil { return err } @@ -302,15 +336,62 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim } // GetProxyNum return the num of detected proxy node -func (t *timetickSync) GetProxyNum() int { +func (t *timetickSync) getProxyNum() int { t.lock.Lock() defer t.lock.Unlock() return len(t.proxyTimeTick) } -// GetChanNum return the num of channel -func (t *timetickSync) GetChanNum() int { - return t.core.dmlChannels.GetPhysicalChannelNum() +/////////////////////////////////////////////////////////////////////////////// +// GetDmlChannelName return a valid dml channel name +func (t *timetickSync) getDmlChannelName() string { + return t.dmlChannels.getChannelName() +} + +// GetDmlChannelNum return the num of dml channels +func (t *timetickSync) getDmlChannelNum() int { + return t.dmlChannels.getChannelNum() +} + +// ListDmlChannels return all in-use dml channel names +func (t *timetickSync) listDmlChannels() []string { + return t.dmlChannels.listChannels() +} + +// AddDmlChannels add dml channels +func (t *timetickSync) addDmlChannels(names ...string) { + t.dmlChannels.addChannels(names...) +} + +// RemoveDmlChannels remove dml channels +func (t *timetickSync) removeDmlChannels(names ...string) { + t.dmlChannels.removeChannels(names...) +} + +// BroadcastDmlChannels broadcasts msg pack into dml channels +func (t *timetickSync) broadcastDmlChannels(chanNames []string, pack *msgstream.MsgPack) error { + return t.dmlChannels.broadcast(chanNames, pack) +} + +// BroadcastMarkDmlChannels broadcasts msg pack into dml channels +func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) { + return t.dmlChannels.broadcastMark(chanNames, pack) +} + +/////////////////////////////////////////////////////////////////////////////// +// GetDeltaChannelName return a valid delta channel name +func (t *timetickSync) getDeltaChannelName() string { + return t.deltaChannels.getChannelName() +} + +// AddDeltaChannels add delta channels +func (t *timetickSync) addDeltaChannels(names ...string) { + t.deltaChannels.addChannels(names...) +} + +// RemoveDeltaChannels remove delta channels +func (t *timetickSync) removeDeltaChannels(names ...string) { + t.deltaChannels.removeChannels(names...) } func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index 2f895b6069..0609cfc51d 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -13,90 +13,99 @@ package rootcoord import ( "context" - "sync" "testing" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/stretchr/testify/assert" ) -func TestTimetickSync_sendToChannel(t *testing.T) { - tt := newTimeTickSync(nil) - tt.sendToChannel() +func TestTimetickSync(t *testing.T) { + ctx := context.Background() - ctt := &internalpb.ChannelTimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - }, + session := &sessionutil.Session{ + ServerID: 100, } - cttm := newChannelTimeTickMsg(ctt) - tt.proxyTimeTick[1] = cttm - tt.sendToChannel() - - tt.proxyTimeTick[2] = nil - tt.sendToChannel() -} - -func TestTimetickSync_RemoveDdlTimeTick(t *testing.T) { - tt := newTimeTickSync(nil) - tt.AddDdlTimeTick(uint64(1), "1") - tt.AddDdlTimeTick(uint64(2), "2") - tt.RemoveDdlTimeTick(uint64(1), "1") - assert.Equal(t, tt.ddlMinTs, uint64(2)) -} - -func TestTimetickSync_UpdateTimeTick(t *testing.T) { - tt := newTimeTickSync(nil) - - ctt := &internalpb.ChannelTimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - }, - DefaultTimestamp: 0, - } - - err := tt.UpdateTimeTick(ctt, "1") + factory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "pulsarAddress": Params.PulsarAddress, + "receiveBufSize": 1024, + "pulsarBufSize": 1024} + err := factory.SetParams(m) assert.Nil(t, err) - ctt.ChannelNames = append(ctt.ChannelNames, "a") - err = tt.UpdateTimeTick(ctt, "1") - assert.Error(t, err) + //chanMap := map[typeutil.UniqueID][]string{ + // int64(1): {"rootcoord-dml_0"}, + //} - core := &Core{ - ctx: context.TODO(), - cancel: nil, - ddlLock: sync.Mutex{}, - msFactory: nil, - session: &sessionutil.Session{ - ServerID: 100, - }, - } - tt.core = core + Params.DmlChannelNum = 2 + Params.DmlChannelName = "rootcoord-dml" + Params.DeltaChannelName = "rootcoord-delta" + ttSync := newTimeTickSync(ctx, session, factory, nil) - ctt.Timestamps = append(ctt.Timestamps, uint64(2)) - ctt.Base.SourceID = int64(1) - cttm := newChannelTimeTickMsg(ctt) - tt.proxyTimeTick[ctt.Base.SourceID] = cttm - ctt.DefaultTimestamp = uint64(200) - tt.ddlMinTs = uint64(100) - err = tt.UpdateTimeTick(ctt, "1") - assert.Nil(t, err) + t.Run("sendToChannel", func(t *testing.T) { + ttSync.sendToChannel() - tt.ddlMinTs = uint64(300) - tt.proxyTimeTick[ctt.Base.SourceID].in.DefaultTimestamp = uint64(1) - tt.core.session.ServerID = int64(1) - err = tt.UpdateTimeTick(ctt, "1") - assert.Nil(t, err) -} - -func Test_minTimeTick(t *testing.T) { - tts := make([]uint64, 2) - tts[0] = uint64(5) - tts[1] = uint64(3) - - ret := minTimeTick(tts...) - assert.Equal(t, ret, tts[1]) + ttSync.proxyTimeTick[1] = nil + ttSync.sendToChannel() + + msg := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + }, + } + ttSync.proxyTimeTick[1] = newChannelTimeTickMsg(msg) + ttSync.sendToChannel() + }) + + t.Run("RemoveDdlTimeTick", func(t *testing.T) { + ttSync.addDdlTimeTick(uint64(1), "1") + ttSync.addDdlTimeTick(uint64(2), "2") + ttSync.removeDdlTimeTick(uint64(1), "1") + assert.Equal(t, ttSync.ddlMinTs, uint64(2)) + }) + + t.Run("UpdateTimeTick", func(t *testing.T) { + msg := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + SourceID: int64(1), + }, + DefaultTimestamp: 0, + } + + err := ttSync.updateTimeTick(msg, "1") + assert.Nil(t, err) + + msg.ChannelNames = append(msg.ChannelNames, "a") + err = ttSync.updateTimeTick(msg, "1") + assert.Error(t, err) + + msg.Timestamps = append(msg.Timestamps, uint64(2)) + msg.DefaultTimestamp = uint64(200) + cttMsg := newChannelTimeTickMsg(msg) + ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg + + ttSync.ddlMinTs = uint64(100) + err = ttSync.updateTimeTick(msg, "1") + assert.Nil(t, err) + + ttSync.ddlMinTs = uint64(300) + ttSync.session.ServerID = int64(1) + err = ttSync.updateTimeTick(msg, "1") + assert.Nil(t, err) + }) + + t.Run("minTimeTick", func(t *testing.T) { + tts := make([]uint64, 2) + tts[0] = uint64(5) + tts[1] = uint64(3) + + ret := minTimeTick(tts...) + assert.Equal(t, ret, tts[1]) + }) }