From 5600d06583e1a1c81a131eeba587d59bb644121e Mon Sep 17 00:00:00 2001 From: yukun Date: Mon, 25 Jan 2021 18:53:08 +0800 Subject: [PATCH] Add global_rmq Signed-off-by: yukun --- .../msgstream/{rmq => rmqms}/rmq_msgstream.go | 2 +- internal/util/rocksmq/global_allocator.go | 167 +++++++++++++++ internal/util/rocksmq/global_rmq.go | 13 ++ internal/util/rocksmq/rocksmq.go | 5 +- internal/util/rocksmq/rocksmq_test.go | 28 +-- internal/util/rocksmq/tso.go | 202 ++++++++++++++++++ 6 files changed, 400 insertions(+), 17 deletions(-) rename internal/msgstream/{rmq => rmqms}/rmq_msgstream.go (99%) create mode 100644 internal/util/rocksmq/global_allocator.go create mode 100644 internal/util/rocksmq/global_rmq.go create mode 100644 internal/util/rocksmq/tso.go diff --git a/internal/msgstream/rmq/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go similarity index 99% rename from internal/msgstream/rmq/rmq_msgstream.go rename to internal/msgstream/rmqms/rmq_msgstream.go index 8656bb2ec9..5c20b24c27 100644 --- a/internal/msgstream/rmq/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -1,4 +1,4 @@ -package rmqmsgstream +package rmqms import ( "context" diff --git a/internal/util/rocksmq/global_allocator.go b/internal/util/rocksmq/global_allocator.go new file mode 100644 index 0000000000..d105003df0 --- /dev/null +++ b/internal/util/rocksmq/global_allocator.go @@ -0,0 +1,167 @@ +package rocksmq + +import ( + "errors" + "log" + "sync/atomic" + "time" + + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.uber.org/zap" +) + +// Allocator is a Timestamp Oracle allocator. +type Allocator interface { + // Initialize is used to initialize a TSO allocator. + // It will synchronize TSO with etcd and initialize the + // memory for later allocation work. + Initialize() error + // UpdateTSO is used to update the TSO in memory and the time window in etcd. + UpdateTSO() error + // SetTSO sets the physical part with given tso. It's mainly used for BR restore + // and can not forcibly set the TSO smaller than now. + SetTSO(tso uint64) error + // GenerateTSO is used to generate a given number of TSOs. + // Make sure you have initialized the TSO allocator before calling. + GenerateTSO(count uint32) (uint64, error) + // Reset is used to reset the TSO allocator. + Reset() +} + +// GlobalTSOAllocator is the global single point TSO allocator. +type GlobalTSOAllocator struct { + tso *timestampOracle +} + +// NewGlobalTSOAllocator creates a new global TSO allocator. +func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { + var saveInterval = 3 * time.Second + return &GlobalTSOAllocator{ + tso: ×tampOracle{ + kvBase: kvBase, + saveInterval: saveInterval, + maxResetTSGap: func() time.Duration { return 3 * time.Second }, + key: key, + }, + } +} + +// Initialize will initialize the created global TSO allocator. +func (gta *GlobalTSOAllocator) Initialize() error { + return gta.tso.InitTimestamp() +} + +// UpdateTSO is used to update the TSO in memory and the time window in etcd. +func (gta *GlobalTSOAllocator) UpdateTSO() error { + return gta.tso.UpdateTimestamp() +} + +// SetTSO sets the physical part with given tso. +func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { + return gta.tso.ResetUserTimestamp(tso) +} + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { + var physical, logical int64 + if count == 0 { + return 0, errors.New("tso count should be positive") + } + + maxRetryCount := 10 + + for i := 0; i < maxRetryCount; i++ { + current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) + if current == nil || current.physical.Equal(typeutil.ZeroTime) { + // If it's leader, maybe SyncTimestamp hasn't completed yet + log.Println("sync hasn't completed yet, wait for a while") + time.Sleep(200 * time.Millisecond) + continue + } + + physical = current.physical.UnixNano() / int64(time.Millisecond) + logical = atomic.AddInt64(¤t.logical, int64(count)) + if logical >= maxLogical { + log.Println("logical part outside of max logical interval, please check ntp time", + zap.Int("retry-count", i)) + time.Sleep(UpdateTimestampStep) + continue + } + return tsoutil.ComposeTS(physical, logical), nil + } + return 0, errors.New("can not get timestamp") +} + +func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) { + //return gta.tso.SyncTimestamp() + start, err := gta.GenerateTSO(count) + if err != nil { + return typeutil.ZeroTimestamp, err + } + //ret := make([]typeutil.Timestamp, count) + //for i:=uint32(0); i < count; i++{ + // ret[i] = start + uint64(i) + //} + return start, err +} + +func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { + return gta.GenerateTSO(1) +} + +// Reset is used to reset the TSO allocator. +func (gta *GlobalTSOAllocator) Reset() { + gta.tso.ResetTimestamp() +} + +/////////////////////////////////////////////////////////////////////// + +type IDAllocator interface { + Alloc(count uint32) (UniqueID, UniqueID, error) + AllocOne() (UniqueID, error) + UpdateID() error +} + +// GlobalTSOAllocator is the global single point TSO allocator. +type GlobalIDAllocator struct { + allocator Allocator +} + +func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { + return &GlobalIDAllocator{ + allocator: NewGlobalTSOAllocator(key, base), + } +} + +// Initialize will initialize the created global TSO allocator. +func (gia *GlobalIDAllocator) Initialize() error { + return gia.allocator.Initialize() +} + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { + timestamp, err := gia.allocator.GenerateTSO(count) + if err != nil { + return 0, 0, err + } + idStart := UniqueID(timestamp) + idEnd := idStart + int64(count) + return idStart, idEnd, nil +} + +func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { + timestamp, err := gia.allocator.GenerateTSO(1) + if err != nil { + return 0, err + } + idStart := UniqueID(timestamp) + return idStart, nil +} + +func (gia *GlobalIDAllocator) UpdateID() error { + return gia.allocator.UpdateTSO() +} diff --git a/internal/util/rocksmq/global_rmq.go b/internal/util/rocksmq/global_rmq.go new file mode 100644 index 0000000000..a32e49dd44 --- /dev/null +++ b/internal/util/rocksmq/global_rmq.go @@ -0,0 +1,13 @@ +package rocksmq + +var rmq *RocksMQ + +func InitRmq(rocksdbName string, idAllocator IDAllocator) error { + var err error + rmq, err = NewRocksMQ(rocksdbName, idAllocator) + return err +} + +func GetRmq() *RocksMQ { + return rmq +} diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 42a92ee99f..e5e5309131 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -7,7 +7,6 @@ import ( "github.com/tecbot/gorocksdb" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" @@ -73,7 +72,7 @@ type RocksMQ struct { kv kv.Base channels map[string]*Channel cgCtxs map[string]ConsumerGroupContext - idAllocator master.IDAllocator + idAllocator IDAllocator produceMu sync.Mutex consumeMu sync.Mutex //ctx context.Context @@ -85,7 +84,7 @@ type RocksMQ struct { //tsoTicker *time.Ticker } -func NewRocksMQ(name string, idAllocator master.IDAllocator) (*RocksMQ, error) { +func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) { bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity)) opts := gorocksdb.NewDefaultOptions() diff --git a/internal/util/rocksmq/rocksmq_test.go b/internal/util/rocksmq/rocksmq_test.go index 1ded31043c..74bd9d2a8c 100644 --- a/internal/util/rocksmq/rocksmq_test.go +++ b/internal/util/rocksmq/rocksmq_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - master "github.com/zilliztech/milvus-distributed/internal/master" "go.etcd.io/etcd/clientv3" ) @@ -20,14 +19,15 @@ func TestFixChannelName(t *testing.T) { } func TestRocksMQ(t *testing.T) { - master.Init() - - etcdAddr := master.Params.EtcdAddress + etcdAddr := os.Getenv("ETCD_ADDRESS") + if etcdAddr == "" { + etcdAddr = "localhost:2379" + } cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq" @@ -76,14 +76,15 @@ func TestRocksMQ(t *testing.T) { } func TestRocksMQ_Loop(t *testing.T) { - master.Init() - - etcdAddr := master.Params.EtcdAddress + etcdAddr := os.Getenv("ETCD_ADDRESS") + if etcdAddr == "" { + etcdAddr = "localhost:2379" + } cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_1" @@ -143,14 +144,15 @@ func TestRocksMQ_Loop(t *testing.T) { } func TestRocksMQ_Goroutines(t *testing.T) { - master.Init() - - etcdAddr := master.Params.EtcdAddress + etcdAddr := os.Getenv("ETCD_ADDRESS") + if etcdAddr == "" { + etcdAddr = "localhost:2379" + } cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_2" diff --git a/internal/util/rocksmq/tso.go b/internal/util/rocksmq/tso.go new file mode 100644 index 0000000000..0db2901b83 --- /dev/null +++ b/internal/util/rocksmq/tso.go @@ -0,0 +1,202 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rocksmq + +import ( + "log" + "sync/atomic" + "time" + "unsafe" + + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +const ( + // UpdateTimestampStep is used to update timestamp. + UpdateTimestampStep = 50 * time.Millisecond + // updateTimestampGuard is the min timestamp interval. + updateTimestampGuard = time.Millisecond + // maxLogical is the max upper limit for logical time. + // When a TSO's logical time reaches this limit, + // the physical time will be forced to increase. + maxLogical = int64(1 << 18) +) + +// atomicObject is used to store the current TSO in memory. +type atomicObject struct { + physical time.Time + logical int64 +} + +// timestampOracle is used to maintain the logic of tso. +type timestampOracle struct { + key string + kvBase kv.TxnBase + + // TODO: remove saveInterval + saveInterval time.Duration + maxResetTSGap func() time.Duration + // For tso, set after the PD becomes a leader. + TSO unsafe.Pointer + lastSavedTime atomic.Value +} + +func (t *timestampOracle) loadTimestamp() (time.Time, error) { + strData, err := t.kvBase.Load(t.key) + + var binData []byte = []byte(strData) + + if err != nil { + return typeutil.ZeroTime, err + } + if len(binData) == 0 { + return typeutil.ZeroTime, nil + } + return typeutil.ParseTimestamp(binData) +} + +// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, +// otherwise, update it. +func (t *timestampOracle) saveTimestamp(ts time.Time) error { + data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) + err := t.kvBase.Save(t.key, string(data)) + if err != nil { + return errors.WithStack(err) + } + t.lastSavedTime.Store(ts) + return nil +} + +func (t *timestampOracle) InitTimestamp() error { + + //last, err := t.loadTimestamp() + //if err != nil { + // return err + //} + + next := time.Now() + + // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, + // the timestamp allocation will start from the saved etcd timestamp temporarily. + //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { + // next = last.Add(updateTimestampGuard) + //} + + save := next.Add(t.saveInterval) + if err := t.saveTimestamp(save); err != nil { + return err + } + + //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) + + current := &atomicObject{ + physical: next, + } + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) + + return nil +} + +// ResetUserTimestamp update the physical part with specified tso. +func (t *timestampOracle) ResetUserTimestamp(tso uint64) error { + physical, _ := tsoutil.ParseTS(tso) + next := physical.Add(time.Millisecond) + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) + + // do not update + if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { + return errors.New("the specified ts too small than now") + } + + if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() { + return errors.New("the specified ts too large than now") + } + + save := next.Add(t.saveInterval) + if err := t.saveTimestamp(save); err != nil { + return err + } + update := &atomicObject{ + physical: next, + } + atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) + return nil +} + +// UpdateTimestamp is used to update the timestamp. +// This function will do two things: +// 1. When the logical time is going to be used up, increase the current physical time. +// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time +// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and +// we also need to save the next physical time plus `TsoSaveInterval` into etcd. +// +// Here is some constraints that this function must satisfy: +// 1. The saved time is monotonically increasing. +// 2. The physical time is monotonically increasing. +// 3. The physical time is always less than the saved timestamp. +func (t *timestampOracle) UpdateTimestamp() error { + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) + now := time.Now() + + jetLag := typeutil.SubTimeByWallClock(now, prev.physical) + if jetLag > 3*UpdateTimestampStep { + log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) + } + + var next time.Time + prevLogical := atomic.LoadInt64(&prev.logical) + // If the system time is greater, it will be synchronized with the system time. + if jetLag > updateTimestampGuard { + next = now + } else if prevLogical > maxLogical/2 { + // The reason choosing maxLogical/2 here is that it's big enough for common cases. + // Because there is enough timestamp can be allocated before next update. + log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) + next = prev.physical.Add(time.Millisecond) + } else { + // It will still use the previous physical time to alloc the timestamp. + return nil + } + + // It is not safe to increase the physical time to `next`. + // The time window needs to be updated and saved to etcd. + if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { + save := next.Add(t.saveInterval) + if err := t.saveTimestamp(save); err != nil { + return err + } + } + + current := &atomicObject{ + physical: next, + logical: 0, + } + + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) + + return nil +} + +// ResetTimestamp is used to reset the timestamp. +func (t *timestampOracle) ResetTimestamp() { + zero := &atomicObject{ + physical: time.Now(), + } + atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) +}