mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Rename timetickSync APIs and variables for better readability (#15264)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
parent
f564ea7fc7
commit
356efee492
@ -1036,18 +1036,18 @@ func (c *Core) Init() error {
|
||||
|
||||
chanMap := c.MetaTable.ListCollectionPhysicalChannels()
|
||||
c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.msFactory, chanMap)
|
||||
c.chanTimeTick.addProxy(c.session)
|
||||
c.chanTimeTick.addSession(c.session)
|
||||
c.proxyClientManager = newProxyClientManager(c)
|
||||
|
||||
log.Debug("RootCoord, set proxy manager")
|
||||
c.proxyManager = newProxyManager(
|
||||
c.ctx,
|
||||
c.etcdCli,
|
||||
c.chanTimeTick.clearProxy,
|
||||
c.chanTimeTick.clearSessions,
|
||||
c.proxyClientManager.GetProxyClients,
|
||||
)
|
||||
c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyManager.DelSession(c.chanTimeTick.delProxy, c.proxyClientManager.DelProxyClient)
|
||||
c.proxyManager.AddSession(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyManager.DelSession(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
|
||||
|
||||
c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
|
||||
|
||||
|
||||
@ -756,11 +756,11 @@ func TestRootCoord(t *testing.T) {
|
||||
//assert.True(t, ok)
|
||||
//assert.Greater(t, ddm.Base.Timestamp, uint64(0))
|
||||
core.chanTimeTick.lock.Lock()
|
||||
assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2)
|
||||
pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID]
|
||||
assert.Equal(t, len(core.chanTimeTick.sess2ChanTsMap), 2)
|
||||
pt, ok := core.chanTimeTick.sess2ChanTsMap[core.session.ServerID]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, shardsNum, int32(len(pt.chanTs)))
|
||||
for chanName, ts := range pt.chanTs {
|
||||
assert.Equal(t, shardsNum, int32(len(pt.chanTsMap)))
|
||||
for chanName, ts := range pt.chanTsMap {
|
||||
assert.Contains(t, createMeta.PhysicalChannelNames, chanName)
|
||||
assert.Equal(t, pt.defaultTs, ts)
|
||||
}
|
||||
@ -1838,7 +1838,7 @@ func TestRootCoord(t *testing.T) {
|
||||
s, _ := core.UpdateChannelTimeTick(ctx, msg0)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, s.ErrorCode)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
//t.Log(core.chanTimeTick.proxyTimeTick)
|
||||
//t.Log(core.chanTimeTick.sess2ChanTsMap)
|
||||
|
||||
msg1 := &internalpb.ChannelTimeTickMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
@ -1865,7 +1865,7 @@ func TestRootCoord(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// 2 proxy, 1 rootcoord
|
||||
assert.Equal(t, 3, core.chanTimeTick.getProxyNum())
|
||||
assert.Equal(t, 3, core.chanTimeTick.getSessionNum())
|
||||
|
||||
// add 3 proxy channels
|
||||
assert.Equal(t, 3, core.chanTimeTick.getDmlChannelNum()-numChan)
|
||||
|
||||
@ -45,14 +45,14 @@ var (
|
||||
|
||||
type timetickSync struct {
|
||||
ctx context.Context
|
||||
sourceID int64
|
||||
sourceID typeutil.UniqueID
|
||||
|
||||
dmlChannels *dmlChannels // used for insert
|
||||
deltaChannels *dmlChannels // used for delete
|
||||
|
||||
lock sync.Mutex
|
||||
proxyTimeTick map[typeutil.UniqueID]*chanTsMsg
|
||||
sendChan chan map[typeutil.UniqueID]*chanTsMsg
|
||||
lock sync.Mutex
|
||||
sess2ChanTsMap map[typeutil.UniqueID]*chanTsMsg
|
||||
sendChan chan map[typeutil.UniqueID]*chanTsMsg
|
||||
|
||||
// record ddl timetick info
|
||||
ddlLock sync.RWMutex
|
||||
@ -61,25 +61,25 @@ type timetickSync struct {
|
||||
}
|
||||
|
||||
type chanTsMsg struct {
|
||||
chanTs map[string]typeutil.Timestamp
|
||||
chanTsMap map[string]typeutil.Timestamp
|
||||
defaultTs typeutil.Timestamp
|
||||
cnt int64
|
||||
}
|
||||
|
||||
func newChanTsMsg(in *internalpb.ChannelTimeTickMsg, cnt int64) *chanTsMsg {
|
||||
msg := &chanTsMsg{
|
||||
chanTs: make(map[string]typeutil.Timestamp),
|
||||
chanTsMap: make(map[string]typeutil.Timestamp),
|
||||
defaultTs: in.DefaultTimestamp,
|
||||
cnt: cnt,
|
||||
}
|
||||
for idx := range in.ChannelNames {
|
||||
msg.chanTs[in.ChannelNames[idx]] = in.Timestamps[idx]
|
||||
msg.chanTsMap[in.ChannelNames[idx]] = in.Timestamps[idx]
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
|
||||
if ts, ok := c.chanTs[channelName]; ok {
|
||||
if ts, ok := c.chanTsMap[channelName]; ok {
|
||||
return ts
|
||||
}
|
||||
return c.defaultTs
|
||||
@ -116,9 +116,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
|
||||
dmlChannels: dmlChannels,
|
||||
deltaChannels: deltaChannels,
|
||||
|
||||
lock: sync.Mutex{},
|
||||
proxyTimeTick: make(map[typeutil.UniqueID]*chanTsMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
|
||||
lock: sync.Mutex{},
|
||||
sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
|
||||
|
||||
ddlLock: sync.RWMutex{},
|
||||
ddlMinTs: typeutil.Timestamp(math.MaxUint64),
|
||||
@ -129,16 +129,16 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
|
||||
// sendToChannel send all channels' timetick to sendChan
|
||||
// lock is needed by the invoker
|
||||
func (t *timetickSync) sendToChannel() {
|
||||
if len(t.proxyTimeTick) == 0 {
|
||||
if len(t.sess2ChanTsMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// detect whether rootcoord receives ttMsg from all proxy nodes
|
||||
// detect whether rootcoord receives ttMsg from all source sessions
|
||||
maxCnt := int64(0)
|
||||
idleProxyList := make([]typeutil.UniqueID, 0, len(t.proxyTimeTick))
|
||||
for id, v := range t.proxyTimeTick {
|
||||
idleSessionList := make([]typeutil.UniqueID, 0, len(t.sess2ChanTsMap))
|
||||
for id, v := range t.sess2ChanTsMap {
|
||||
if v == nil {
|
||||
idleProxyList = append(idleProxyList, id)
|
||||
idleSessionList = append(idleSessionList, id)
|
||||
} else {
|
||||
if maxCnt < v.cnt {
|
||||
maxCnt = v.cnt
|
||||
@ -146,20 +146,20 @@ func (t *timetickSync) sendToChannel() {
|
||||
}
|
||||
}
|
||||
|
||||
if len(idleProxyList) > 0 {
|
||||
// give warning every 2 second if not get ttMsg from proxy nodes
|
||||
if len(idleSessionList) > 0 {
|
||||
// give warning every 2 second if not get ttMsg from source sessions
|
||||
if maxCnt%10 == 0 {
|
||||
log.Warn("proxy idle for long time", zap.Any("proxy list", idleProxyList),
|
||||
log.Warn("session idle for long time", zap.Any("idle session list", idleSessionList),
|
||||
zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.Milliseconds()*maxCnt))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// clear proxyTimeTick and send a clone
|
||||
// clear sess2ChanTsMap and send a clone
|
||||
ptt := make(map[typeutil.UniqueID]*chanTsMsg)
|
||||
for k, v := range t.proxyTimeTick {
|
||||
for k, v := range t.sess2ChanTsMap {
|
||||
ptt[k] = v
|
||||
t.proxyTimeTick[k] = nil
|
||||
t.sess2ChanTsMap[k] = nil
|
||||
}
|
||||
t.sendChan <- ptt
|
||||
}
|
||||
@ -221,9 +221,9 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
|
||||
return fmt.Errorf("invalid TimeTickMsg")
|
||||
}
|
||||
|
||||
prev, ok := t.proxyTimeTick[in.Base.SourceID]
|
||||
prev, ok := t.sess2ChanTsMap[in.Base.SourceID]
|
||||
if !ok {
|
||||
return fmt.Errorf("skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID)
|
||||
return fmt.Errorf("skip ChannelTimeTickMsg from un-recognized session %d", in.Base.SourceID)
|
||||
}
|
||||
|
||||
// if ddl operation not finished, skip current ts update
|
||||
@ -247,41 +247,41 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
|
||||
}
|
||||
|
||||
if prev == nil {
|
||||
t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, 1)
|
||||
t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, 1)
|
||||
} else {
|
||||
t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1)
|
||||
t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1)
|
||||
}
|
||||
|
||||
t.sendToChannel()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *timetickSync) addProxy(sess *sessionutil.Session) {
|
||||
func (t *timetickSync) addSession(sess *sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.proxyTimeTick[sess.ServerID] = nil
|
||||
log.Debug("Add proxy for timeticksync", zap.Int64("serverID", sess.ServerID))
|
||||
t.sess2ChanTsMap[sess.ServerID] = nil
|
||||
log.Debug("Add session for timeticksync", zap.Int64("serverID", sess.ServerID))
|
||||
}
|
||||
|
||||
func (t *timetickSync) delProxy(sess *sessionutil.Session) {
|
||||
func (t *timetickSync) delSession(sess *sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if _, ok := t.proxyTimeTick[sess.ServerID]; ok {
|
||||
delete(t.proxyTimeTick, sess.ServerID)
|
||||
log.Debug("Remove proxy from timeticksync", zap.Int64("serverID", sess.ServerID))
|
||||
if _, ok := t.sess2ChanTsMap[sess.ServerID]; ok {
|
||||
delete(t.sess2ChanTsMap, sess.ServerID)
|
||||
log.Debug("Remove session from timeticksync", zap.Int64("serverID", sess.ServerID))
|
||||
t.sendToChannel()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *timetickSync) clearProxy(sess []*sessionutil.Session) {
|
||||
func (t *timetickSync) clearSessions(sess []*sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
for _, s := range sess {
|
||||
t.proxyTimeTick[s.ServerID] = nil
|
||||
t.sess2ChanTsMap[s.ServerID] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// StartWatch watch proxy node change and process all channels' timetick msg
|
||||
// StartWatch watch session change and process all channels' timetick msg
|
||||
func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
@ -297,15 +297,15 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
case <-t.ctx.Done():
|
||||
log.Debug("rootcoord context done", zap.Error(t.ctx.Err()))
|
||||
return
|
||||
case proxyTimetick, ok := <-t.sendChan:
|
||||
case sessTimetick, ok := <-t.sendChan:
|
||||
if !ok {
|
||||
log.Debug("timetickSync sendChan closed")
|
||||
return
|
||||
}
|
||||
|
||||
// reduce each channel to get min timestamp
|
||||
local := proxyTimetick[t.sourceID]
|
||||
if len(local.chanTs) == 0 {
|
||||
local := sessTimetick[t.sourceID]
|
||||
if len(local.chanTsMap) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -313,14 +313,14 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
checker.Check()
|
||||
}
|
||||
|
||||
hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTs))
|
||||
hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTsMap))
|
||||
tr := timerecord.NewTimeRecorder(hdr)
|
||||
wg := sync.WaitGroup{}
|
||||
for chanName, ts := range local.chanTs {
|
||||
for chanName, ts := range local.chanTsMap {
|
||||
wg.Add(1)
|
||||
go func(chanName string, ts typeutil.Timestamp) {
|
||||
mints := ts
|
||||
for _, tt := range proxyTimetick {
|
||||
for _, tt := range sessTimetick {
|
||||
currTs := tt.getTimetick(chanName)
|
||||
if currTs < mints {
|
||||
mints = currTs
|
||||
@ -337,7 +337,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
// rootcoord send tt msg to all channels every 200ms by default
|
||||
if span > Params.ProxyCfg.TimeTickInterval {
|
||||
log.Warn("rootcoord send tt to all channels too slowly",
|
||||
zap.Int("chanNum", len(local.chanTs)), zap.Int64("span", span.Milliseconds()))
|
||||
zap.Int("chanNum", len(local.chanTsMap)), zap.Int64("span", span.Milliseconds()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -375,11 +375,11 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetProxyNum return the num of detected proxy node
|
||||
func (t *timetickSync) getProxyNum() int {
|
||||
// GetSessionNum return the num of detected sessions
|
||||
func (t *timetickSync) getSessionNum() int {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return len(t.proxyTimeTick)
|
||||
return len(t.sess2ChanTsMap)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -55,7 +55,7 @@ func TestTimetickSync(t *testing.T) {
|
||||
defer wg.Done()
|
||||
ttSync.sendToChannel()
|
||||
|
||||
ttSync.proxyTimeTick[1] = nil
|
||||
ttSync.sess2ChanTsMap[1] = nil
|
||||
ttSync.sendToChannel()
|
||||
|
||||
msg := &internalpb.ChannelTimeTickMsg{
|
||||
@ -63,7 +63,7 @@ func TestTimetickSync(t *testing.T) {
|
||||
MsgType: commonpb.MsgType_TimeTick,
|
||||
},
|
||||
}
|
||||
ttSync.proxyTimeTick[1] = newChanTsMsg(msg, 1)
|
||||
ttSync.sess2ChanTsMap[1] = newChanTsMsg(msg, 1)
|
||||
ttSync.sendToChannel()
|
||||
})
|
||||
|
||||
@ -97,7 +97,7 @@ func TestTimetickSync(t *testing.T) {
|
||||
msg.Timestamps = append(msg.Timestamps, uint64(2))
|
||||
msg.DefaultTimestamp = uint64(200)
|
||||
cttMsg := newChanTsMsg(msg, 1)
|
||||
ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg
|
||||
ttSync.sess2ChanTsMap[msg.Base.SourceID] = cttMsg
|
||||
|
||||
ttSync.ddlMinTs = uint64(100)
|
||||
err = ttSync.updateTimeTick(msg, "1")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user