mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Add debug log when proxy idle for a long time (#12319)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
parent
48bdeb56d8
commit
cbbeb2a383
@ -720,14 +720,11 @@ func TestRootCoord(t *testing.T) {
|
||||
assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2)
|
||||
pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, shardsNum, int32(len(pt.in.ChannelNames)))
|
||||
assert.Equal(t, shardsNum, int32(len(pt.in.Timestamps)))
|
||||
assert.Equal(t, shardsNum, int32(len(pt.timeTick)))
|
||||
assert.ElementsMatch(t, pt.in.ChannelNames, createMeta.PhysicalChannelNames)
|
||||
assert.Equal(t, pt.in.Timestamps[0], pt.in.Timestamps[1])
|
||||
assert.Equal(t, pt.in.Timestamps[0], pt.in.DefaultTimestamp)
|
||||
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[0]], pt.in.DefaultTimestamp)
|
||||
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[1]], pt.in.DefaultTimestamp)
|
||||
assert.Equal(t, shardsNum, int32(len(pt.chanTs)))
|
||||
for chanName, ts := range pt.chanTs {
|
||||
assert.Contains(t, createMeta.PhysicalChannelNames, chanName)
|
||||
assert.Equal(t, pt.defaultTs, ts)
|
||||
}
|
||||
core.chanTimeTick.lock.Unlock()
|
||||
|
||||
// check DD operation info
|
||||
|
||||
@ -37,8 +37,8 @@ type timetickSync struct {
|
||||
deltaChannels *dmlChannels // used for delete
|
||||
|
||||
lock sync.Mutex
|
||||
proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
|
||||
sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg
|
||||
proxyTimeTick map[typeutil.UniqueID]*chanTsMsg
|
||||
sendChan chan map[typeutil.UniqueID]*chanTsMsg
|
||||
|
||||
// record ddl timetick info
|
||||
ddlLock sync.RWMutex
|
||||
@ -46,28 +46,29 @@ type timetickSync struct {
|
||||
ddlTsSet map[typeutil.Timestamp]struct{}
|
||||
}
|
||||
|
||||
type channelTimeTickMsg struct {
|
||||
in *internalpb.ChannelTimeTickMsg
|
||||
timeTick map[string]typeutil.Timestamp
|
||||
type chanTsMsg struct {
|
||||
chanTs map[string]typeutil.Timestamp
|
||||
defaultTs typeutil.Timestamp
|
||||
cnt int64
|
||||
}
|
||||
|
||||
func newChannelTimeTickMsg(in *internalpb.ChannelTimeTickMsg) *channelTimeTickMsg {
|
||||
msg := &channelTimeTickMsg{
|
||||
in: in,
|
||||
timeTick: make(map[string]typeutil.Timestamp),
|
||||
func newChanTsMsg(in *internalpb.ChannelTimeTickMsg, cnt int64) *chanTsMsg {
|
||||
msg := &chanTsMsg{
|
||||
chanTs: make(map[string]typeutil.Timestamp),
|
||||
defaultTs: in.DefaultTimestamp,
|
||||
cnt: cnt,
|
||||
}
|
||||
for idx := range in.ChannelNames {
|
||||
msg.timeTick[in.ChannelNames[idx]] = in.Timestamps[idx]
|
||||
msg.chanTs[in.ChannelNames[idx]] = in.Timestamps[idx]
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp {
|
||||
tt, ok := c.timeTick[channelName]
|
||||
if ok {
|
||||
return tt
|
||||
func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
|
||||
if ts, ok := c.chanTs[channelName]; ok {
|
||||
return ts
|
||||
}
|
||||
return c.in.DefaultTimestamp
|
||||
return c.defaultTs
|
||||
}
|
||||
|
||||
func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
|
||||
@ -102,8 +103,8 @@ func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory
|
||||
deltaChannels: deltaChannels,
|
||||
|
||||
lock: sync.Mutex{},
|
||||
proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16),
|
||||
proxyTimeTick: make(map[typeutil.UniqueID]*chanTsMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
|
||||
|
||||
ddlLock: sync.RWMutex{},
|
||||
ddlMinTs: typeutil.Timestamp(math.MaxUint64),
|
||||
@ -117,13 +118,31 @@ func (t *timetickSync) sendToChannel() {
|
||||
if len(t.proxyTimeTick) == 0 {
|
||||
return
|
||||
}
|
||||
for _, v := range t.proxyTimeTick {
|
||||
|
||||
// detect whether rootcoord receives ttMsg from all proxy nodes
|
||||
maxCnt := int64(0)
|
||||
idleProxyList := make([]typeutil.UniqueID, 0)
|
||||
for id, v := range t.proxyTimeTick {
|
||||
if v == nil {
|
||||
idleProxyList = append(idleProxyList, id)
|
||||
} else {
|
||||
if maxCnt < v.cnt {
|
||||
maxCnt = v.cnt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(idleProxyList) > 0 {
|
||||
// give warning every 2 second if not get ttMsg from proxy nodes
|
||||
if maxCnt%10 == 0 {
|
||||
log.Warn("proxy idle for long time", zap.Any("proxy list", idleProxyList),
|
||||
zap.Int64("idle time", int64(Params.TimeTickInterval)*maxCnt))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// clear proxyTimeTick and send a clone
|
||||
ptt := make(map[typeutil.UniqueID]*channelTimeTickMsg)
|
||||
ptt := make(map[typeutil.UniqueID]*chanTsMsg)
|
||||
for k, v := range t.proxyTimeTick {
|
||||
ptt[k] = v
|
||||
t.proxyTimeTick[k] = nil
|
||||
@ -204,24 +223,20 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
|
||||
}
|
||||
|
||||
if in.Base.SourceID == t.session.ServerID {
|
||||
if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp {
|
||||
if prev != nil && in.DefaultTimestamp <= prev.defaultTs {
|
||||
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID),
|
||||
zap.Uint64("curr ts", in.DefaultTimestamp),
|
||||
zap.Uint64("prev ts", prev.in.DefaultTimestamp),
|
||||
zap.Uint64("prev ts", prev.defaultTs),
|
||||
zap.String("reason", reason))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if in.DefaultTimestamp == 0 {
|
||||
mints := minTimeTick(in.Timestamps...)
|
||||
log.Debug("default time stamp is zero, set it to the min value of inputs",
|
||||
zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints))
|
||||
in.DefaultTimestamp = mints
|
||||
}
|
||||
|
||||
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
|
||||
//log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
|
||||
// zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
|
||||
if prev == nil {
|
||||
t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, 1)
|
||||
} else {
|
||||
t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1)
|
||||
}
|
||||
|
||||
t.sendToChannel()
|
||||
return nil
|
||||
@ -268,36 +283,35 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
|
||||
// reduce each channel to get min timestamp
|
||||
local := proxyTimetick[t.session.ServerID]
|
||||
if len(local.in.ChannelNames) == 0 {
|
||||
if len(local.chanTs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames))
|
||||
hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTs))
|
||||
tr := timerecord.NewTimeRecorder(hdr)
|
||||
wg := sync.WaitGroup{}
|
||||
for _, chanName := range local.in.ChannelNames {
|
||||
for chanName, ts := range local.chanTs {
|
||||
wg.Add(1)
|
||||
go func(chanName string) {
|
||||
mints := local.getTimetick(chanName)
|
||||
go func(chanName string, ts typeutil.Timestamp) {
|
||||
mints := ts
|
||||
for _, tt := range proxyTimetick {
|
||||
ts := tt.getTimetick(chanName)
|
||||
if ts < mints {
|
||||
mints = ts
|
||||
currTs := tt.getTimetick(chanName)
|
||||
if currTs < mints {
|
||||
mints = currTs
|
||||
}
|
||||
}
|
||||
if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil {
|
||||
log.Debug("SendTimeTickToChannel fail", zap.Error(err))
|
||||
}
|
||||
wg.Done()
|
||||
}(chanName)
|
||||
}(chanName, ts)
|
||||
}
|
||||
wg.Wait()
|
||||
span := tr.ElapseSpan()
|
||||
// rootcoord send tt msg to all channels every 200ms by default
|
||||
if span.Milliseconds() > 200 {
|
||||
if span.Milliseconds() > int64(Params.TimeTickInterval) {
|
||||
log.Warn("rootcoord send tt to all channels too slowly",
|
||||
zap.Int("chanNum", len(local.in.ChannelNames)),
|
||||
zap.Int64("span", span.Milliseconds()))
|
||||
zap.Int("chanNum", len(local.chanTs)), zap.Int64("span", span.Milliseconds()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func TestTimetickSync(t *testing.T) {
|
||||
MsgType: commonpb.MsgType_TimeTick,
|
||||
},
|
||||
}
|
||||
ttSync.proxyTimeTick[1] = newChannelTimeTickMsg(msg)
|
||||
ttSync.proxyTimeTick[1] = newChanTsMsg(msg, 1)
|
||||
ttSync.sendToChannel()
|
||||
})
|
||||
|
||||
@ -87,7 +87,7 @@ func TestTimetickSync(t *testing.T) {
|
||||
|
||||
msg.Timestamps = append(msg.Timestamps, uint64(2))
|
||||
msg.DefaultTimestamp = uint64(200)
|
||||
cttMsg := newChannelTimeTickMsg(msg)
|
||||
cttMsg := newChanTsMsg(msg, 1)
|
||||
ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg
|
||||
|
||||
ttSync.ddlMinTs = uint64(100)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user