diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 54bf296a72..32ce0ab74a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -583,8 +583,9 @@ func (c *Core) setMsgStreams() error { Timestamp: t, SourceID: c.session.ServerID, }, - ChannelNames: pc, - Timestamps: pt, + ChannelNames: pc, + Timestamps: pt, + DefaultTimestamp: t, } return c.chanTimeTick.UpdateTimeTick(&ttMsg) } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 4177482366..46e35bb8d2 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -479,11 +479,15 @@ func TestMasterService(t *testing.T) { assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2) pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID] assert.True(t, ok) - assert.Equal(t, 2, len(pt.ChannelNames)) - assert.Equal(t, 2, len(pt.Timestamps)) - assert.Equal(t, pt.ChannelNames, createMeta.PhysicalChannelNames) - assert.Equal(t, pt.Timestamps[0], pt.Timestamps[1]) - assert.LessOrEqual(t, createPart.BeginTimestamp, pt.Timestamps[0]) + assert.Equal(t, 2, len(pt.in.ChannelNames)) + assert.Equal(t, 2, len(pt.in.Timestamps)) + assert.Equal(t, 2, len(pt.timeTick)) + assert.Equal(t, pt.in.ChannelNames, createMeta.PhysicalChannelNames) + assert.Equal(t, pt.in.Timestamps[0], pt.in.Timestamps[1]) + assert.Equal(t, pt.in.Timestamps[0], pt.in.DefaultTimestamp) + assert.Equal(t, pt.timeTick[pt.in.ChannelNames[0]], pt.in.DefaultTimestamp) + assert.Equal(t, pt.timeTick[pt.in.ChannelNames[1]], pt.in.DefaultTimestamp) + assert.LessOrEqual(t, createPart.BeginTimestamp, pt.in.Timestamps[0]) core.chanTimeTick.lock.Unlock() // check DD operation info diff --git a/internal/masterservice/timeticksync.go b/internal/masterservice/timeticksync.go index a5ebacc00e..18cb2ccd98 100644 --- a/internal/masterservice/timeticksync.go +++ b/internal/masterservice/timeticksync.go @@ -29,16 +29,40 @@ import ( type timetickSync struct { core *Core lock sync.Mutex - proxyTimeTick map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg - sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg + proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg + sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg +} + +type channelTimeTickMsg struct { + in *internalpb.ChannelTimeTickMsg + timeTick map[string]typeutil.Timestamp +} + +func newChannelTimeTickMsg(in *internalpb.ChannelTimeTickMsg) *channelTimeTickMsg { + msg := &channelTimeTickMsg{ + in: in, + timeTick: make(map[string]typeutil.Timestamp), + } + for idx := range in.ChannelNames { + msg.timeTick[in.ChannelNames[idx]] = in.Timestamps[idx] + } + return msg +} + +func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp { + tt, ok := c.timeTick[channelName] + if ok { + return tt + } + return c.in.DefaultTimestamp } func newTimeTickSync(core *Core) *timetickSync { return &timetickSync{ lock: sync.Mutex{}, core: core, - proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg), - sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16), + proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg), + sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16), } } @@ -54,7 +78,7 @@ func (t *timetickSync) sendToChannel() { } } // clear proxyTimeTick and send a clone - ptt := make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg) + ptt := make(map[typeutil.UniqueID]*channelTimeTickMsg) for k, v := range t.proxyTimeTick { ptt[k] = v t.proxyTimeTick[k] = nil @@ -78,12 +102,18 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error { return fmt.Errorf("Skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID) } if in.Base.SourceID == t.core.session.ServerID { - if prev != nil && prev.Timestamps[0] >= in.Timestamps[0] { - log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.Timestamps[0]), zap.Uint64("curr ts", in.Timestamps[0])) + if prev != nil && prev.in.DefaultTimestamp >= in.DefaultTimestamp { + log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.in.DefaultTimestamp), zap.Uint64("curr ts", in.DefaultTimestamp)) return nil } } - t.proxyTimeTick[in.Base.SourceID] = in + if in.DefaultTimestamp == 0 { + mints := minTimeTick(in.Timestamps...) + log.Debug("default time stamp is zero, set it to the min value of inputs", zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints)) + in.DefaultTimestamp = mints + } + + t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) t.sendToChannel() return nil } @@ -123,27 +153,22 @@ func (t *timetickSync) StartWatch() { log.Debug("timetickSync sendChan closed") return } + // reduce each channel to get min timestamp - chanName2TimeTickMap := make(map[string]typeutil.Timestamp) - for _, cttMsg := range ptt { - chanNum := len(cttMsg.ChannelNames) - for i := 0; i < chanNum; i++ { - name := cttMsg.ChannelNames[i] - ts := cttMsg.Timestamps[i] - cts, ok := chanName2TimeTickMap[name] - if !ok || ts < cts { - chanName2TimeTickMap[name] = ts + mtt := ptt[t.core.session.ServerID] + for _, chanName := range mtt.in.ChannelNames { + mints := mtt.getTimetick(chanName) + for _, tt := range ptt { + ts := tt.getTimetick(chanName) + if ts < mints { + mints = ts } } - } - log.Debug("MasterService timeticksync", - zap.Any("chanName2TimeTickMap", chanName2TimeTickMap)) - // send timetick msg to msg stream - for chanName, chanTs := range chanName2TimeTickMap { - if err := t.SendChannelTimeTick(chanName, chanTs); err != nil { + if err := t.SendChannelTimeTick(chanName, mints); err != nil { log.Debug("SendChannelTimeTick fail", zap.Error(err)) } } + } } } @@ -188,3 +213,17 @@ func (t *timetickSync) GetProxyNodeNum() int { func (t *timetickSync) GetChanNum() int { return t.core.dmlChannels.GetNumChannles() } + +func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { + var ret typeutil.Timestamp + for _, t := range tt { + if ret == 0 { + ret = t + } else { + if t < ret { + ret = t + } + } + } + return ret +}