From ab5a7cbf44458cf008df7d0be8f01676eac5b344 Mon Sep 17 00:00:00 2001 From: sunby Date: Mon, 19 Jul 2021 18:53:09 +0800 Subject: [PATCH] Remove timesync package (#6619) issue: #6618 Signed-off-by: sunby --- internal/timesync/OWNERS | 10 - internal/timesync/time_sync_producer.go | 98 --------- internal/timesync/timesync.go | 281 ------------------------ internal/timesync/timesync_test.go | 167 -------------- internal/timesync/timetick_watcher.go | 64 ------ 5 files changed, 620 deletions(-) delete mode 100644 internal/timesync/OWNERS delete mode 100644 internal/timesync/time_sync_producer.go delete mode 100644 internal/timesync/timesync.go delete mode 100644 internal/timesync/timesync_test.go delete mode 100644 internal/timesync/timetick_watcher.go diff --git a/internal/timesync/OWNERS b/internal/timesync/OWNERS deleted file mode 100644 index a557a38520..0000000000 --- a/internal/timesync/OWNERS +++ /dev/null @@ -1,10 +0,0 @@ -# order by contributions -reviewers: - - sunby - - DragonDriver - - xiaocai2333 - -approvers: - - czs007 - - neza2017 - - scsven diff --git a/internal/timesync/time_sync_producer.go b/internal/timesync/time_sync_producer.go deleted file mode 100644 index ff1c47cc7f..0000000000 --- a/internal/timesync/time_sync_producer.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package timesync - -import ( - "context" - "sync" - - "github.com/milvus-io/milvus/internal/logutil" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - ms "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" -) - -type MsgProducer struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - ttBarrier TimeTickBarrier - watchers []TimeTickWatcher -} - -func NewTimeSyncMsgProducer(ttBarrier TimeTickBarrier, watchers ...TimeTickWatcher) (*MsgProducer, error) { - return &MsgProducer{ - ttBarrier: ttBarrier, - watchers: watchers, - }, nil -} - -func (producer *MsgProducer) broadcastMsg() { - defer logutil.LogPanic() - defer producer.wg.Done() - for { - select { - case <-producer.ctx.Done(): - log.Debug("broadcast context done, exit") - return - default: - } - tt, err := producer.ttBarrier.GetTimeTick() - if err != nil { - log.Debug("broadcast get time tick error", zap.Error(err)) - return - } - baseMsg := ms.BaseMsg{ - BeginTimestamp: tt, - EndTimestamp: tt, - HashValues: []uint32{0}, - } - timeTickResult := internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: tt, - SourceID: 0, - }, - } - timeTickMsg := &ms.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - for _, watcher := range producer.watchers { - watcher.Watch(timeTickMsg) - } - } -} - -func (producer *MsgProducer) Start(ctx context.Context) { - producer.ctx, producer.cancel = context.WithCancel(ctx) - producer.wg.Add(1 + len(producer.watchers)) - for _, watcher := range producer.watchers { - go producer.startWatcher(watcher) - } - go producer.broadcastMsg() -} - -func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) { - defer logutil.LogPanic() - defer producer.wg.Done() - watcher.StartBackgroundLoop(producer.ctx) -} - -func (producer *MsgProducer) Close() { - producer.cancel() - producer.wg.Wait() -} diff --git a/internal/timesync/timesync.go b/internal/timesync/timesync.go deleted file mode 100644 index 23ec33ce47..0000000000 --- a/internal/timesync/timesync.go +++ /dev/null @@ -1,281 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package timesync - -import ( - "context" - "errors" - "fmt" - "math" - "sync" - "sync/atomic" - - "github.com/milvus-io/milvus/internal/logutil" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/typeutil" - - "github.com/milvus-io/milvus/internal/log" - ms "github.com/milvus-io/milvus/internal/msgstream" -) - -type ( - Timestamp = typeutil.Timestamp - UniqueID = typeutil.UniqueID - - TimeTickBarrier interface { - GetTimeTick() (Timestamp, error) - Start() - Close() - AddPeer(peerID UniqueID) error - } - - softTimeTickBarrier struct { - peer2LastTt map[UniqueID]Timestamp - peerMtx sync.RWMutex - minTtInterval Timestamp - lastTt int64 - outTt chan Timestamp - ttStream ms.MsgStream - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - } - - hardTimeTickBarrier struct { - peer2Tt map[UniqueID]Timestamp - peer2TtMu sync.Mutex - outTt chan Timestamp - ttStream ms.MsgStream - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - } -) - -func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { - if len(peerIds) <= 0 { - log.Warn("[newSoftTimeTickBarrier] Warning: peerIds is empty!") - //return nil - } - - sttbarrier := softTimeTickBarrier{} - sttbarrier.minTtInterval = minTtInterval - sttbarrier.ttStream = ttStream - sttbarrier.outTt = make(chan Timestamp, 1024) - sttbarrier.ctx, sttbarrier.cancel = context.WithCancel(ctx) - sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp) - for _, id := range peerIds { - sttbarrier.peer2LastTt[id] = Timestamp(0) - } - if len(peerIds) != len(sttbarrier.peer2LastTt) { - log.Warn("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!") - } - - return &sttbarrier -} - -func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { - select { - case <-ttBarrier.ctx.Done(): - return 0, errors.New("getTimeTick closed") - case ts, ok := <-ttBarrier.outTt: - if !ok { - return 0, errors.New("getTimeTick closed") - } - num := len(ttBarrier.outTt) - for i := 0; i < num; i++ { - ts, ok = <-ttBarrier.outTt - if !ok { - return 0, errors.New("getTimeTick closed") - } - } - atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts)) - return ts, ttBarrier.ctx.Err() - } -} - -func (ttBarrier *softTimeTickBarrier) Start() { - ttBarrier.wg.Add(1) - go func() { - defer ttBarrier.wg.Done() - for { - select { - case <-ttBarrier.ctx.Done(): - log.Warn("TtBarrierStart", zap.Error(ttBarrier.ctx.Err())) - return - - case ttmsgs := <-ttBarrier.ttStream.Chan(): - ttBarrier.peerMtx.RLock() - if len(ttmsgs.Msgs) > 0 { - for _, timetickmsg := range ttmsgs.Msgs { - ttmsg := timetickmsg.(*ms.TimeTickMsg) - oldT, ok := ttBarrier.peer2LastTt[ttmsg.Base.SourceID] - - if !ok { - log.Warn("softTimeTickBarrier", zap.Int64("peerID %d not exist", ttmsg.Base.SourceID)) - continue - } - if ttmsg.Base.Timestamp > oldT { - ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp - - // get a legal Timestamp - ts := ttBarrier.minTimestamp() - lastTt := atomic.LoadInt64(&(ttBarrier.lastTt)) - if lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) { - continue - } - ttBarrier.outTt <- ts - } - } - } - ttBarrier.peerMtx.RUnlock() - } - } - }() - - ttBarrier.ttStream.Start() -} - -func (ttBarrier *softTimeTickBarrier) Close() { - ttBarrier.cancel() - ttBarrier.wg.Wait() - ttBarrier.ttStream.Close() -} - -func (ttBarrier *softTimeTickBarrier) AddPeer(peerID UniqueID) error { - ttBarrier.peerMtx.Lock() - defer ttBarrier.peerMtx.Unlock() - - _, ok := ttBarrier.peer2LastTt[peerID] - if ok { - log.Debug("softTimeTickBarrier.AddPeer", zap.Int64("no need to add duplicated peer", peerID)) - return nil - } - - ttBarrier.peer2LastTt[peerID] = 0 - - return nil -} - -func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp { - tempMin := Timestamp(math.MaxUint64) - for _, tt := range ttBarrier.peer2LastTt { - if tt < tempMin { - tempMin = tt - } - } - return tempMin -} - -func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { - select { - case <-ttBarrier.ctx.Done(): - return 0, errors.New("getTimeTick closed") - case ts, ok := <-ttBarrier.outTt: - if !ok { - return 0, errors.New("getTimeTick closed") - } - return ts, ttBarrier.ctx.Err() - } -} - -func (ttBarrier *hardTimeTickBarrier) Start() { - // Last timestamp synchronized - ttBarrier.wg.Add(1) - state := Timestamp(0) - go func(ctx context.Context) { - defer logutil.LogPanic() - defer ttBarrier.wg.Done() - for { - select { - case <-ctx.Done(): - log.Debug("[TtBarrierStart] shut down", zap.Error(ttBarrier.ctx.Err())) - return - default: - } - ttmsgs := ttBarrier.ttStream.Consume() - - if ttmsgs != nil && len(ttmsgs.Msgs) > 0 { - log.Debug("receive tt msg") - ttBarrier.peer2TtMu.Lock() - for _, timetickmsg := range ttmsgs.Msgs { - // Suppose ttmsg.Timestamp from stream is always larger than the previous one, - // that `ttmsg.Timestamp > oldT` - ttmsg := timetickmsg.(*ms.TimeTickMsg) - - oldT, ok := ttBarrier.peer2Tt[ttmsg.Base.SourceID] - if !ok { - log.Warn("[hardTimeTickBarrier] peerID not exist", zap.Int64("peerID", ttmsg.Base.SourceID)) - continue - } - - if oldT > state { - log.Warn("[hardTimeTickBarrier] peer's timestamp ahead", - zap.Int64("peerID", ttmsg.Base.SourceID), zap.Uint64("timestamp", ttmsg.Base.Timestamp)) - } - - ttBarrier.peer2Tt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp - newState := ttBarrier.minTimestamp() - if newState > state { - ttBarrier.outTt <- newState - state = newState - } - } - ttBarrier.peer2TtMu.Unlock() - } - } - }(ttBarrier.ctx) -} - -func (ttBarrier *hardTimeTickBarrier) Close() { - ttBarrier.cancel() - ttBarrier.wg.Wait() -} - -func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { - tempMin := Timestamp(math.MaxUint64) - for _, tt := range ttBarrier.peer2Tt { - if tt < tempMin { - tempMin = tt - } - } - return tempMin -} - -func (ttBarrier *hardTimeTickBarrier) AddPeer(peerID UniqueID) error { - ttBarrier.peer2TtMu.Lock() - defer ttBarrier.peer2TtMu.Unlock() - if _, ok := ttBarrier.peer2Tt[peerID]; ok { - return fmt.Errorf("peer %d already exist", peerID) - } - ttBarrier.peer2Tt[peerID] = Timestamp(0) - return nil -} - -func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { - ttbarrier := hardTimeTickBarrier{} - ttbarrier.ttStream = ttStream - ttbarrier.outTt = make(chan Timestamp, 1024) - - ttbarrier.peer2Tt = make(map[UniqueID]Timestamp) - ttbarrier.ctx, ttbarrier.cancel = context.WithCancel(ctx) - for _, id := range peerIds { - ttbarrier.peer2Tt[id] = Timestamp(0) - } - if len(peerIds) != len(ttbarrier.peer2Tt) { - log.Warn("[newHardTimeTickBarrier] there are duplicate peerIds!", zap.Int64s("peerIDs", peerIds)) - } - - return &ttbarrier -} diff --git a/internal/timesync/timesync_test.go b/internal/timesync/timesync_test.go deleted file mode 100644 index 592407bbfe..0000000000 --- a/internal/timesync/timesync_test.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package timesync - -import ( - "context" - "math" - "testing" - "time" - - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/internal/proto/commonpb" - - "github.com/milvus-io/milvus/internal/proto/internalpb" - - "github.com/milvus-io/milvus/internal/msgstream" -) - -func ttStreamProduceLoop(ctx context.Context, ttStream msgstream.MsgStream, durationInterval time.Duration, sourceID int64) { - log.Debug("ttStreamProduceLoop", zap.Any("durationInterval", durationInterval)) - timer := time.NewTicker(durationInterval) - - go func() { - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - ttMsgs := &msgstream.MsgPack{ - BeginTs: 0, - EndTs: 0, - Msgs: nil, - StartPositions: nil, - EndPositions: nil, - } - - currentT := uint64(time.Now().Nanosecond()) - msg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: nil, - MsgPosition: nil, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: 0, - MsgID: 0, - Timestamp: currentT, - SourceID: sourceID, - }, - }, - } - - ttMsgs.Msgs = append(ttMsgs.Msgs, msg) - - _ = ttStream.Produce(ttMsgs) - //log.Debug("ttStreamProduceLoop", zap.Any("Send", currentT)) - } - } - }() -} - -func TestSoftTimeTickBarrier_Start(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - ttBarrier.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - defer ttBarrier.Close() -} - -func TestSoftTimeTickBarrier_Close(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - ttBarrier.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - defer ttBarrier.Close() -} - -func TestSoftTimeTickBarrier_GetTimeTick(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - ttBarrier.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - defer ttBarrier.Close() - - num := 10 - for i := 0; i < num; i++ { - tick, err := ttBarrier.GetTimeTick() - assert.Equal(t, nil, err) - log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick)) - } -} - -func TestSoftTimeTickBarrier_AddPeer(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - ttBarrier.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - defer ttBarrier.Close() - - newSourceID := UniqueID(2) - err := ttBarrier.AddPeer(newSourceID) - assert.Equal(t, nil, err) - ttStreamProduceLoop(ctx, ttStream, durationInterval, newSourceID) - - num := 10 - for i := 0; i < num; i++ { - tick, err := ttBarrier.GetTimeTick() - assert.Equal(t, nil, err) - log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick)) - } -} diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go deleted file mode 100644 index 3ab16bc4f6..0000000000 --- a/internal/timesync/timetick_watcher.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package timesync - -import ( - "context" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - ms "github.com/milvus-io/milvus/internal/msgstream" -) - -type TimeTickWatcher interface { - Watch(msg *ms.TimeTickMsg) - StartBackgroundLoop(ctx context.Context) -} - -type MsgTimeTickWatcher struct { - streams []ms.MsgStream - msgQueue chan *ms.TimeTickMsg -} - -func NewMsgTimeTickWatcher(streams ...ms.MsgStream) *MsgTimeTickWatcher { - watcher := &MsgTimeTickWatcher{ - streams: streams, - msgQueue: make(chan *ms.TimeTickMsg), - } - return watcher -} - -func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) { - watcher.msgQueue <- msg -} - -func (watcher *MsgTimeTickWatcher) StartBackgroundLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Debug("msg time tick watcher closed") - return - case msg := <-watcher.msgQueue: - msgPack := &ms.MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, msg) - for _, stream := range watcher.streams { - if err := stream.Broadcast(msgPack); err != nil { - log.Warn("stream broadcast failed", zap.Error(err)) - } - } - } - } -} - -func (watcher *MsgTimeTickWatcher) Close() { -}