diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5fb1311e86..2fec1645fc 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -471,39 +471,7 @@ func (c *Core) setMsgStreams() error { return fmt.Errorf("RootCoordSubName is empty") } - // rootcoord time tick channel - if Params.CommonCfg.RootCoordTimeTick == "" { - return fmt.Errorf("timeTickChannel is empty") - } - timeTickStream, _ := c.factory.NewMsgStream(c.ctx) - metrics.RootCoordNumOfMsgStream.Inc() - timeTickStream.AsProducer([]string{Params.CommonCfg.RootCoordTimeTick}) - log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.CommonCfg.RootCoordTimeTick)) - c.SendTimeTick = func(t typeutil.Timestamp, reason string) error { - msgPack := ms.MsgPack{} - baseMsg := ms.BaseMsg{ - BeginTimestamp: t, - EndTimestamp: t, - HashValues: []uint32{0}, - } - timeTickResult := internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: t, - SourceID: c.session.ServerID, - }, - } - timeTickMsg := &ms.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := timeTickStream.Broadcast(&msgPack); err != nil { - return err - } - pc := c.chanTimeTick.listDmlChannels() pt := make([]uint64, len(pc)) for i := 0; i < len(pt); i++ { @@ -520,10 +488,6 @@ func (c *Core) setMsgStreams() error { Timestamps: pt, DefaultTimestamp: t, } - //log.Debug("update timetick", - // zap.Any("DefaultTs", t), - // zap.Any("sourceID", c.session.ServerID), - // zap.Any("reason", reason)) return c.chanTimeTick.updateTimeTick(&ttMsg, reason) } @@ -1291,7 +1255,6 @@ func (c *Core) Start() error { } log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID)) - log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.CommonCfg.RootCoordTimeTick)) c.startOnce.Do(func() { if err := c.proxyManager.WatchProxy(); err != nil { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index e7c3744cbf..84b578df2c 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -706,11 +706,6 @@ func TestRootCoord_Base(t *testing.T) { tmpFactory := dependency.NewDefaultFactory(true) - timeTickStream, _ := tmpFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) - timeTickStream.Start() - defer timeTickStream.Close() - dmlStream, _ := tmpFactory.NewMsgStream(ctx) defer dmlStream.Close() @@ -738,27 +733,7 @@ func TestRootCoord_Base(t *testing.T) { time.Sleep(100 * time.Millisecond) shardsNum := int32(8) - fmt.Printf("hello world2") var wg sync.WaitGroup - wg.Add(1) - t.Run("time tick", func(t *testing.T) { - defer wg.Done() - ttmsg, ok := <-timeTickStream.Chan() - assert.True(t, ok) - assert.Equal(t, 1, len(ttmsg.Msgs)) - ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg) - assert.True(t, ok) - assert.Greater(t, ttm.Base.Timestamp, uint64(0)) - t.Log(ttm.Base.Timestamp) - - ttmsg2, ok := <-timeTickStream.Chan() - assert.True(t, ok) - assert.Equal(t, 1, len(ttmsg2.Msgs)) - ttm2, ok := (ttmsg2.Msgs[0]).(*msgstream.TimeTickMsg) - assert.True(t, ok) - assert.Greater(t, ttm2.Base.Timestamp, uint64(0)) - assert.Equal(t, ttm2.Base.Timestamp, ttm.Base.Timestamp+1) - }) wg.Add(1) t.Run("create collection", func(t *testing.T) { @@ -2077,14 +2052,9 @@ func TestRootCoord_Base(t *testing.T) { defer wg.Done() const ( proxyIDInvalid = 102 - proxyName0 = "proxy_0" - proxyName1 = "proxy_1" - chanName0 = "c0" - chanName1 = "c1" - chanName2 = "c2" - ts0 = uint64(100) - ts1 = uint64(120) - ts2 = uint64(150) + ts0 = uint64(20) + ts1 = uint64(40) + ts2 = uint64(60) ) numChan := core.chanTimeTick.getDmlChannelNum() p1 := sessionutil.Session{ @@ -2118,12 +2088,41 @@ func TestRootCoord_Base(t *testing.T) { dn2 := core.chanTimeTick.getDeltaChannelName() core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2) + // wait for local channel reported + for { + core.chanTimeTick.lock.Lock() + _, ok := core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn0] + + if !ok { + core.chanTimeTick.lock.Unlock() + time.Sleep(100 * time.Millisecond) + continue + } + + _, ok = core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn1] + + if !ok { + core.chanTimeTick.lock.Unlock() + time.Sleep(100 * time.Millisecond) + continue + } + + _, ok = core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn2] + + if !ok { + core.chanTimeTick.lock.Unlock() + time.Sleep(100 * time.Millisecond) + continue + } + core.chanTimeTick.lock.Unlock() + break + } msg0 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_TimeTick, SourceID: 100, }, - ChannelNames: []string{chanName0, chanName1}, + ChannelNames: []string{cn0, cn1}, Timestamps: []uint64{ts0, ts2}, } s, _ := core.UpdateChannelTimeTick(ctx, msg0) @@ -2136,7 +2135,7 @@ func TestRootCoord_Base(t *testing.T) { MsgType: commonpb.MsgType_TimeTick, SourceID: 101, }, - ChannelNames: []string{chanName1, chanName2}, + ChannelNames: []string{cn1, cn2}, Timestamps: []uint64{ts1, ts2}, } s, _ = core.UpdateChannelTimeTick(ctx, msg1) @@ -2666,24 +2665,9 @@ func TestRootCoord2(t *testing.T) { err = core.Register() assert.NoError(t, err) - timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) - timeTickStream.Start() - time.Sleep(100 * time.Millisecond) var wg sync.WaitGroup - wg.Add(1) - t.Run("time tick", func(t *testing.T) { - defer wg.Done() - ttmsg, ok := <-timeTickStream.Chan() - assert.True(t, ok) - assert.Equal(t, 1, len(ttmsg.Msgs)) - ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg) - assert.True(t, ok) - assert.Greater(t, ttm.Base.Timestamp, typeutil.Timestamp(0)) - }) - wg.Add(1) t.Run("create collection", func(t *testing.T) { defer wg.Done() @@ -2960,10 +2944,6 @@ func TestCheckFlushedSegments(t *testing.T) { err = core.Register() assert.NoError(t, err) - timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) - timeTickStream.Start() - time.Sleep(100 * time.Millisecond) var wg sync.WaitGroup @@ -3120,10 +3100,6 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { err = core.Register() assert.NoError(t, err) - timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) - timeTickStream.Start() - time.Sleep(100 * time.Millisecond) modifyFunc := func(collInfo *etcdpb.CollectionInfo) { diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 5aa4aab138..c0c3695b1d 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -219,7 +219,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return nil } if len(in.Timestamps) != len(in.ChannelNames) { - return fmt.Errorf("invalid TimeTickMsg") + return fmt.Errorf("invalid TimeTickMsg, timestamp and channelname size mismatch") } prev, ok := t.sess2ChanTsMap[in.Base.SourceID] @@ -230,7 +230,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason // if ddl operation not finished, skip current ts update ddlMinTs := t.getDdlMinTimeTick() if in.DefaultTimestamp > ddlMinTs { - log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID), + log.Info("ddl not finished", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), zap.Uint64("ddlMinTs", ddlMinTs), zap.String("reason", reason)) @@ -239,7 +239,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason if in.Base.SourceID == t.sourceID { if prev != nil && in.DefaultTimestamp <= prev.defaultTs { - log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), + log.Warn("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), zap.Uint64("prev ts", prev.defaultTs), zap.String("reason", reason)) @@ -252,7 +252,6 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason } else { t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1) } - t.sendToChannel() return nil } @@ -326,7 +325,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { } } if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil { - log.Debug("SendTimeTickToChannel fail", zap.Error(err)) + log.Warn("SendTimeTickToChannel fail", zap.Error(err)) } wg.Done() }(chanName, ts) @@ -364,7 +363,6 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim TimeTickMsg: timeTickResult, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := t.dmlChannels.broadcast(chanNames, &msgPack); err != nil { return err } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index f923b16c24..8854b2c138 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -193,6 +193,7 @@ func (p *commonConfig) initProxySubName() { } // --- rootcoord --- +// Deprecate func (p *commonConfig) initRootCoordTimeTick() { keys := []string{ "common.chanNamePrefix.rootCoordTimeTick", @@ -242,6 +243,7 @@ func (p *commonConfig) initQueryCoordSearch() { p.QueryCoordSearch = p.initChanNamePrefix(keys) } +// Deprecated, search result use grpc instead of a result channel func (p *commonConfig) initQueryCoordSearchResult() { keys := []string{ "common.chanNamePrefix.searchResult", @@ -250,6 +252,7 @@ func (p *commonConfig) initQueryCoordSearchResult() { p.QueryCoordSearchResult = p.initChanNamePrefix(keys) } +// Deprecate func (p *commonConfig) initQueryCoordTimeTick() { keys := []string{ "common.chanNamePrefix.queryTimeTick", @@ -284,6 +287,7 @@ func (p *commonConfig) initDataCoordStatistic() { p.DataCoordStatistic = p.initChanNamePrefix(keys) } +// Deprecate func (p *commonConfig) initDataCoordTimeTick() { keys := []string{ "common.chanNamePrefix.dataCoordTimeTick",