add default time tick (#5828)

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2021-06-17 15:54:07 +08:00 committed by GitHub
parent deb937a156
commit 4e1932e0b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 30 deletions

View File

@ -583,8 +583,9 @@ func (c *Core) setMsgStreams() error {
Timestamp: t,
SourceID: c.session.ServerID,
},
ChannelNames: pc,
Timestamps: pt,
ChannelNames: pc,
Timestamps: pt,
DefaultTimestamp: t,
}
return c.chanTimeTick.UpdateTimeTick(&ttMsg)
}

View File

@ -479,11 +479,15 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2)
pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID]
assert.True(t, ok)
assert.Equal(t, 2, len(pt.ChannelNames))
assert.Equal(t, 2, len(pt.Timestamps))
assert.Equal(t, pt.ChannelNames, createMeta.PhysicalChannelNames)
assert.Equal(t, pt.Timestamps[0], pt.Timestamps[1])
assert.LessOrEqual(t, createPart.BeginTimestamp, pt.Timestamps[0])
assert.Equal(t, 2, len(pt.in.ChannelNames))
assert.Equal(t, 2, len(pt.in.Timestamps))
assert.Equal(t, 2, len(pt.timeTick))
assert.Equal(t, pt.in.ChannelNames, createMeta.PhysicalChannelNames)
assert.Equal(t, pt.in.Timestamps[0], pt.in.Timestamps[1])
assert.Equal(t, pt.in.Timestamps[0], pt.in.DefaultTimestamp)
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[0]], pt.in.DefaultTimestamp)
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[1]], pt.in.DefaultTimestamp)
assert.LessOrEqual(t, createPart.BeginTimestamp, pt.in.Timestamps[0])
core.chanTimeTick.lock.Unlock()
// check DD operation info

View File

@ -29,16 +29,40 @@ import (
type timetickSync struct {
core *Core
lock sync.Mutex
proxyTimeTick map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg
sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg
proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg
}
type channelTimeTickMsg struct {
in *internalpb.ChannelTimeTickMsg
timeTick map[string]typeutil.Timestamp
}
func newChannelTimeTickMsg(in *internalpb.ChannelTimeTickMsg) *channelTimeTickMsg {
msg := &channelTimeTickMsg{
in: in,
timeTick: make(map[string]typeutil.Timestamp),
}
for idx := range in.ChannelNames {
msg.timeTick[in.ChannelNames[idx]] = in.Timestamps[idx]
}
return msg
}
func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp {
tt, ok := c.timeTick[channelName]
if ok {
return tt
}
return c.in.DefaultTimestamp
}
func newTimeTickSync(core *Core) *timetickSync {
return &timetickSync{
lock: sync.Mutex{},
core: core,
proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg),
sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16),
proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg),
sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16),
}
}
@ -54,7 +78,7 @@ func (t *timetickSync) sendToChannel() {
}
}
// clear proxyTimeTick and send a clone
ptt := make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg)
ptt := make(map[typeutil.UniqueID]*channelTimeTickMsg)
for k, v := range t.proxyTimeTick {
ptt[k] = v
t.proxyTimeTick[k] = nil
@ -78,12 +102,18 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error {
return fmt.Errorf("Skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID)
}
if in.Base.SourceID == t.core.session.ServerID {
if prev != nil && prev.Timestamps[0] >= in.Timestamps[0] {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.Timestamps[0]), zap.Uint64("curr ts", in.Timestamps[0]))
if prev != nil && prev.in.DefaultTimestamp >= in.DefaultTimestamp {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.in.DefaultTimestamp), zap.Uint64("curr ts", in.DefaultTimestamp))
return nil
}
}
t.proxyTimeTick[in.Base.SourceID] = in
if in.DefaultTimestamp == 0 {
mints := minTimeTick(in.Timestamps...)
log.Debug("default time stamp is zero, set it to the min value of inputs", zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints))
in.DefaultTimestamp = mints
}
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
t.sendToChannel()
return nil
}
@ -123,27 +153,22 @@ func (t *timetickSync) StartWatch() {
log.Debug("timetickSync sendChan closed")
return
}
// reduce each channel to get min timestamp
chanName2TimeTickMap := make(map[string]typeutil.Timestamp)
for _, cttMsg := range ptt {
chanNum := len(cttMsg.ChannelNames)
for i := 0; i < chanNum; i++ {
name := cttMsg.ChannelNames[i]
ts := cttMsg.Timestamps[i]
cts, ok := chanName2TimeTickMap[name]
if !ok || ts < cts {
chanName2TimeTickMap[name] = ts
mtt := ptt[t.core.session.ServerID]
for _, chanName := range mtt.in.ChannelNames {
mints := mtt.getTimetick(chanName)
for _, tt := range ptt {
ts := tt.getTimetick(chanName)
if ts < mints {
mints = ts
}
}
}
log.Debug("MasterService timeticksync",
zap.Any("chanName2TimeTickMap", chanName2TimeTickMap))
// send timetick msg to msg stream
for chanName, chanTs := range chanName2TimeTickMap {
if err := t.SendChannelTimeTick(chanName, chanTs); err != nil {
if err := t.SendChannelTimeTick(chanName, mints); err != nil {
log.Debug("SendChannelTimeTick fail", zap.Error(err))
}
}
}
}
}
@ -188,3 +213,17 @@ func (t *timetickSync) GetProxyNodeNum() int {
func (t *timetickSync) GetChanNum() int {
return t.core.dmlChannels.GetNumChannles()
}
func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {
var ret typeutil.Timestamp
for _, t := range tt {
if ret == 0 {
ret = t
} else {
if t < ret {
ret = t
}
}
}
return ret
}