// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. package rootcoord import ( "fmt" "math" "sync" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) type ddlTimetickInfo struct { ddlMinTs typeutil.Timestamp ddlTsSet map[typeutil.Timestamp]struct{} } type timetickSync struct { core *Core lock sync.Mutex proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg // record ddl timetick info ddlLock sync.RWMutex ddlMinTs typeutil.Timestamp ddlTsSet map[typeutil.Timestamp]struct{} } type channelTimeTickMsg struct { in *internalpb.ChannelTimeTickMsg timeTick map[string]typeutil.Timestamp } func newChannelTimeTickMsg(in *internalpb.ChannelTimeTickMsg) *channelTimeTickMsg { msg := &channelTimeTickMsg{ in: in, timeTick: make(map[string]typeutil.Timestamp), } for idx := range in.ChannelNames { msg.timeTick[in.ChannelNames[idx]] = in.Timestamps[idx] } return msg } func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp { tt, ok := c.timeTick[channelName] if ok { return tt } return c.in.DefaultTimestamp } func newTimeTickSync(core *Core) *timetickSync { return &timetickSync{ lock: sync.Mutex{}, core: core, proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg), sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16), ddlLock: sync.RWMutex{}, ddlMinTs: typeutil.Timestamp(math.MaxUint64), ddlTsSet: make(map[typeutil.Timestamp]struct{}), } } // sendToChannel send all channels' timetick to sendChan // lock is needed by the invoker func (t *timetickSync) sendToChannel() { if len(t.proxyTimeTick) == 0 { return } for _, v := range t.proxyTimeTick { if v == nil { return } } // clear proxyTimeTick and send a clone ptt := make(map[typeutil.UniqueID]*channelTimeTickMsg) for k, v := range t.proxyTimeTick { ptt[k] = v t.proxyTimeTick[k] = nil } t.sendChan <- ptt } // AddDmlTimeTick add ts into ddlTimetickInfos[sourceID], // can be used to tell if DDL operation is in process. func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) { t.ddlLock.Lock() defer t.ddlLock.Unlock() if ts < t.ddlMinTs { t.ddlMinTs = ts } t.ddlTsSet[ts] = struct{}{} log.Debug("add ddl timetick", zap.Uint64("minTs", t.ddlMinTs), zap.Uint64("ts", ts), zap.Int("len(ddlTsSet)", len(t.ddlTsSet)), zap.String("reason", reason)) } // RemoveDdlTimeTick is invoked in UpdateTimeTick. // It clears the ts generated by AddDdlTimeTick, indicates DDL operation finished. func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) { t.ddlLock.Lock() defer t.ddlLock.Unlock() delete(t.ddlTsSet, ts) log.Debug("remove ddl timetick", zap.Uint64("ts", ts), zap.Int("len(ddlTsSet)", len(t.ddlTsSet)), zap.String("reason", reason)) if len(t.ddlTsSet) == 0 { t.ddlMinTs = typeutil.Timestamp(math.MaxUint64) } else if t.ddlMinTs == ts { // re-calculate minTs minTs := typeutil.Timestamp(math.MaxUint64) for tt := range t.ddlTsSet { if tt < minTs { minTs = tt } } t.ddlMinTs = minTs log.Debug("update ddl minTs", zap.Any("minTs", minTs)) } } func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp { t.ddlLock.Lock() defer t.ddlLock.Unlock() return t.ddlMinTs } // UpdateTimeTick check msg validation and send it to local channel func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error { t.lock.Lock() defer t.lock.Unlock() if len(in.ChannelNames) == 0 && in.DefaultTimestamp == 0 { return nil } if len(in.Timestamps) != len(in.ChannelNames) { return fmt.Errorf("Invalid TimeTickMsg") } prev, ok := t.proxyTimeTick[in.Base.SourceID] if !ok { return fmt.Errorf("Skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID) } // if ddl operation not finished, skip current ts update ddlMinTs := t.GetDdlMinTimeTick() if in.DefaultTimestamp > ddlMinTs { log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), zap.Uint64("ddlMinTs", ddlMinTs), zap.String("reason", reason)) return nil } if in.Base.SourceID == t.core.session.ServerID { if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp { log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), zap.Uint64("prev ts", prev.in.DefaultTimestamp), zap.String("reason", reason)) return nil } } if in.DefaultTimestamp == 0 { mints := minTimeTick(in.Timestamps...) log.Debug("default time stamp is zero, set it to the min value of inputs", zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints)) in.DefaultTimestamp = mints } t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), // zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) t.sendToChannel() return nil } func (t *timetickSync) AddProxy(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() t.proxyTimeTick[sess.ServerID] = nil } func (t *timetickSync) DelProxy(sess *sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() if _, ok := t.proxyTimeTick[sess.ServerID]; ok { delete(t.proxyTimeTick, sess.ServerID) t.sendToChannel() } } func (t *timetickSync) GetProxy(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() for _, s := range sess { t.proxyTimeTick[s.ServerID] = nil } } // StartWatch watch proxy node change and process all channels' timetick msg func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { defer wg.Done() for { select { case <-t.core.ctx.Done(): log.Debug("root coord context done", zap.Error(t.core.ctx.Err())) return case ptt, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") return } // reduce each channel to get min timestamp mtt := ptt[t.core.session.ServerID] for _, chanName := range mtt.in.ChannelNames { mints := mtt.getTimetick(chanName) for _, tt := range ptt { ts := tt.getTimetick(chanName) if ts < mints { mints = ts } } if err := t.SendChannelTimeTick(chanName, mints); err != nil { log.Debug("SendChannelTimeTick fail", zap.Error(err)) } } } } } // SendChannelTimeTick send each channel's min timetick to msg stream func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ BeginTimestamp: ts, EndTimestamp: ts, HashValues: []uint32{0}, } timeTickResult := internalpb.TimeTickMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_TimeTick, MsgID: 0, Timestamp: ts, SourceID: t.core.session.ServerID, }, } timeTickMsg := &msgstream.TimeTickMsg{ BaseMsg: baseMsg, TimeTickMsg: timeTickResult, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) err := t.core.dmlChannels.Broadcast(chanName, &msgPack) if err == nil { metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } return err } // GetProxyNum return the num of detected proxy node func (t *timetickSync) GetProxyNum() int { t.lock.Lock() defer t.lock.Unlock() return len(t.proxyTimeTick) } // GetChanNum return the num of channel func (t *timetickSync) GetChanNum() int { return t.core.dmlChannels.GetNumChannels() } func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp { var ret typeutil.Timestamp for _, t := range tt { if ret == 0 { ret = t } else { if t < ret { ret = t } } } return ret }