diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 88ab26e16f..e682938ddd 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1036,18 +1036,18 @@ func (c *Core) Init() error { chanMap := c.MetaTable.ListCollectionPhysicalChannels() c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.msFactory, chanMap) - c.chanTimeTick.addProxy(c.session) + c.chanTimeTick.addSession(c.session) c.proxyClientManager = newProxyClientManager(c) log.Debug("RootCoord, set proxy manager") c.proxyManager = newProxyManager( c.ctx, c.etcdCli, - c.chanTimeTick.clearProxy, + c.chanTimeTick.clearSessions, c.proxyClientManager.GetProxyClients, ) - c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient) - c.proxyManager.DelSession(c.chanTimeTick.delProxy, c.proxyClientManager.DelProxyClient) + c.proxyManager.AddSession(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient) + c.proxyManager.DelSession(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient) c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index ae778768be..208ca2a4f1 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -756,11 +756,11 @@ func TestRootCoord(t *testing.T) { //assert.True(t, ok) //assert.Greater(t, ddm.Base.Timestamp, uint64(0)) core.chanTimeTick.lock.Lock() - assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2) - pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID] + assert.Equal(t, len(core.chanTimeTick.sess2ChanTsMap), 2) + pt, ok := core.chanTimeTick.sess2ChanTsMap[core.session.ServerID] assert.True(t, ok) - assert.Equal(t, shardsNum, int32(len(pt.chanTs))) - for chanName, ts := range pt.chanTs { + assert.Equal(t, shardsNum, int32(len(pt.chanTsMap))) + for chanName, ts := range pt.chanTsMap { assert.Contains(t, createMeta.PhysicalChannelNames, chanName) assert.Equal(t, pt.defaultTs, ts) } @@ -1838,7 +1838,7 @@ func TestRootCoord(t *testing.T) { s, _ := core.UpdateChannelTimeTick(ctx, msg0) assert.Equal(t, commonpb.ErrorCode_Success, s.ErrorCode) time.Sleep(100 * time.Millisecond) - //t.Log(core.chanTimeTick.proxyTimeTick) + //t.Log(core.chanTimeTick.sess2ChanTsMap) msg1 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ @@ -1865,7 +1865,7 @@ func TestRootCoord(t *testing.T) { time.Sleep(100 * time.Millisecond) // 2 proxy, 1 rootcoord - assert.Equal(t, 3, core.chanTimeTick.getProxyNum()) + assert.Equal(t, 3, core.chanTimeTick.getSessionNum()) // add 3 proxy channels assert.Equal(t, 3, core.chanTimeTick.getDmlChannelNum()-numChan) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index af5b8dbc87..8d37574e1d 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -45,14 +45,14 @@ var ( type timetickSync struct { ctx context.Context - sourceID int64 + sourceID typeutil.UniqueID dmlChannels *dmlChannels // used for insert deltaChannels *dmlChannels // used for delete - lock sync.Mutex - proxyTimeTick map[typeutil.UniqueID]*chanTsMsg - sendChan chan map[typeutil.UniqueID]*chanTsMsg + lock sync.Mutex + sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg + sendChan chan map[typeutil.UniqueID]*chanTsMsg // record ddl timetick info ddlLock sync.RWMutex @@ -61,25 +61,25 @@ type timetickSync struct { } type chanTsMsg struct { - chanTs map[string]typeutil.Timestamp + chanTsMap map[string]typeutil.Timestamp defaultTs typeutil.Timestamp cnt int64 } func newChanTsMsg(in *internalpb.ChannelTimeTickMsg, cnt int64) *chanTsMsg { msg := &chanTsMsg{ - chanTs: make(map[string]typeutil.Timestamp), + chanTsMap: make(map[string]typeutil.Timestamp), defaultTs: in.DefaultTimestamp, cnt: cnt, } for idx := range in.ChannelNames { - msg.chanTs[in.ChannelNames[idx]] = in.Timestamps[idx] + msg.chanTsMap[in.ChannelNames[idx]] = in.Timestamps[idx] } return msg } func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { - if ts, ok := c.chanTs[channelName]; ok { + if ts, ok := c.chanTsMap[channelName]; ok { return ts } return c.defaultTs @@ -116,9 +116,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact dmlChannels: dmlChannels, deltaChannels: deltaChannels, - lock: sync.Mutex{}, - proxyTimeTick: make(map[typeutil.UniqueID]*chanTsMsg), - sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16), + lock: sync.Mutex{}, + sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg), + sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16), ddlLock: sync.RWMutex{}, ddlMinTs: typeutil.Timestamp(math.MaxUint64), @@ -129,16 +129,16 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact // sendToChannel send all channels' timetick to sendChan // lock is needed by the invoker func (t *timetickSync) sendToChannel() { - if len(t.proxyTimeTick) == 0 { + if len(t.sess2ChanTsMap) == 0 { return } - // detect whether rootcoord receives ttMsg from all proxy nodes + // detect whether rootcoord receives ttMsg from all source sessions maxCnt := int64(0) - idleProxyList := make([]typeutil.UniqueID, 0, len(t.proxyTimeTick)) - for id, v := range t.proxyTimeTick { + idleSessionList := make([]typeutil.UniqueID, 0, len(t.sess2ChanTsMap)) + for id, v := range t.sess2ChanTsMap { if v == nil { - idleProxyList = append(idleProxyList, id) + idleSessionList = append(idleSessionList, id) } else { if maxCnt < v.cnt { maxCnt = v.cnt @@ -146,20 +146,20 @@ func (t *timetickSync) sendToChannel() { } } - if len(idleProxyList) > 0 { - // give warning every 2 second if not get ttMsg from proxy nodes + if len(idleSessionList) > 0 { + // give warning every 2 second if not get ttMsg from source sessions if maxCnt%10 == 0 { - log.Warn("proxy idle for long time", zap.Any("proxy list", idleProxyList), + log.Warn("session idle for long time", zap.Any("idle session list", idleSessionList), zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.Milliseconds()*maxCnt)) } return } - // clear proxyTimeTick and send a clone + // clear sess2ChanTsMap and send a clone ptt := make(map[typeutil.UniqueID]*chanTsMsg) - for k, v := range t.proxyTimeTick { + for k, v := range t.sess2ChanTsMap { ptt[k] = v - t.proxyTimeTick[k] = nil + t.sess2ChanTsMap[k] = nil } t.sendChan <- ptt } @@ -221,9 +221,9 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return fmt.Errorf("invalid TimeTickMsg") } - prev, ok := t.proxyTimeTick[in.Base.SourceID] + prev, ok := t.sess2ChanTsMap[in.Base.SourceID] if !ok { - return fmt.Errorf("skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID) + return fmt.Errorf("skip ChannelTimeTickMsg from un-recognized session %d", in.Base.SourceID) } // if ddl operation not finished, skip current ts update @@ -247,41 +247,41 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason } if prev == nil { - t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, 1) + t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, 1) } else { - t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1) + t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1) } t.sendToChannel() return nil } -func (t *timetickSync) addProxy(sess *sessionutil.Session) { +func (t *timetickSync) addSession(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() - t.proxyTimeTick[sess.ServerID] = nil - log.Debug("Add proxy for timeticksync", zap.Int64("serverID", sess.ServerID)) + t.sess2ChanTsMap[sess.ServerID] = nil + log.Debug("Add session for timeticksync", zap.Int64("serverID", sess.ServerID)) } -func (t *timetickSync) delProxy(sess *sessionutil.Session) { +func (t *timetickSync) delSession(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() - if _, ok := t.proxyTimeTick[sess.ServerID]; ok { - delete(t.proxyTimeTick, sess.ServerID) - log.Debug("Remove proxy from timeticksync", zap.Int64("serverID", sess.ServerID)) + if _, ok := t.sess2ChanTsMap[sess.ServerID]; ok { + delete(t.sess2ChanTsMap, sess.ServerID) + log.Debug("Remove session from timeticksync", zap.Int64("serverID", sess.ServerID)) t.sendToChannel() } } -func (t *timetickSync) clearProxy(sess []*sessionutil.Session) { +func (t *timetickSync) clearSessions(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() for _, s := range sess { - t.proxyTimeTick[s.ServerID] = nil + t.sess2ChanTsMap[s.ServerID] = nil } } -// StartWatch watch proxy node change and process all channels' timetick msg +// StartWatch watch session change and process all channels' timetick msg func (t *timetickSync) startWatch(wg *sync.WaitGroup) { defer wg.Done() @@ -297,15 +297,15 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { case <-t.ctx.Done(): log.Debug("rootcoord context done", zap.Error(t.ctx.Err())) return - case proxyTimetick, ok := <-t.sendChan: + case sessTimetick, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") return } // reduce each channel to get min timestamp - local := proxyTimetick[t.sourceID] - if len(local.chanTs) == 0 { + local := sessTimetick[t.sourceID] + if len(local.chanTsMap) == 0 { continue } @@ -313,14 +313,14 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { checker.Check() } - hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTs)) + hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTsMap)) tr := timerecord.NewTimeRecorder(hdr) wg := sync.WaitGroup{} - for chanName, ts := range local.chanTs { + for chanName, ts := range local.chanTsMap { wg.Add(1) go func(chanName string, ts typeutil.Timestamp) { mints := ts - for _, tt := range proxyTimetick { + for _, tt := range sessTimetick { currTs := tt.getTimetick(chanName) if currTs < mints { mints = currTs @@ -337,7 +337,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { // rootcoord send tt msg to all channels every 200ms by default if span > Params.ProxyCfg.TimeTickInterval { log.Warn("rootcoord send tt to all channels too slowly", - zap.Int("chanNum", len(local.chanTs)), zap.Int64("span", span.Milliseconds())) + zap.Int("chanNum", len(local.chanTsMap)), zap.Int64("span", span.Milliseconds())) } } } @@ -375,11 +375,11 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim return nil } -// GetProxyNum return the num of detected proxy node -func (t *timetickSync) getProxyNum() int { +// GetSessionNum return the num of detected sessions +func (t *timetickSync) getSessionNum() int { t.lock.Lock() defer t.lock.Unlock() - return len(t.proxyTimeTick) + return len(t.sess2ChanTsMap) } /////////////////////////////////////////////////////////////////////////////// diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index a588601fba..3a697d9711 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -55,7 +55,7 @@ func TestTimetickSync(t *testing.T) { defer wg.Done() ttSync.sendToChannel() - ttSync.proxyTimeTick[1] = nil + ttSync.sess2ChanTsMap[1] = nil ttSync.sendToChannel() msg := &internalpb.ChannelTimeTickMsg{ @@ -63,7 +63,7 @@ func TestTimetickSync(t *testing.T) { MsgType: commonpb.MsgType_TimeTick, }, } - ttSync.proxyTimeTick[1] = newChanTsMsg(msg, 1) + ttSync.sess2ChanTsMap[1] = newChanTsMsg(msg, 1) ttSync.sendToChannel() }) @@ -97,7 +97,7 @@ func TestTimetickSync(t *testing.T) { msg.Timestamps = append(msg.Timestamps, uint64(2)) msg.DefaultTimestamp = uint64(200) cttMsg := newChanTsMsg(msg, 1) - ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg + ttSync.sess2ChanTsMap[msg.Base.SourceID] = cttMsg ttSync.ddlMinTs = uint64(100) err = ttSync.updateTimeTick(msg, "1")