From 7c4aff0ea27f3f106c39cc49bb82d571af5c9bc8 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 25 Sep 2021 11:43:56 +0800 Subject: [PATCH] Fixbug: load last saved timestamp when rootcoord restart (#8137) Signed-off-by: zhenshan.cao --- internal/tso/global_allocator_test.go | 86 +++++++++++++++++++++++++-- internal/tso/tso.go | 46 +++++++------- 2 files changed, 105 insertions(+), 27 deletions(-) diff --git a/internal/tso/global_allocator_test.go b/internal/tso/global_allocator_test.go index cc39af6c9b..d24120bc74 100644 --- a/internal/tso/global_allocator_test.go +++ b/internal/tso/global_allocator_test.go @@ -24,6 +24,43 @@ import ( var gTestTsoAllocator *GlobalTSOAllocator +func TestGlobalTSOAllocator_Initialize(t *testing.T) { + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" + } + etcdEndpoints := strings.Split(endpoints, ",") + etcdKV, err := tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "tsoTest") + assert.NoError(t, err) + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV) + err = gTestTsoAllocator.Initialize() + assert.Nil(t, err) + + err = gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) + + time.Sleep(3 * time.Second) + err = gTestTsoAllocator.Initialize() + assert.Nil(t, err) + + t.Run("GenerateTSO", func(t *testing.T) { + count := 1000 + perCount := uint32(100) + startTs, err := gTestTsoAllocator.GenerateTSO(perCount) + assert.Nil(t, err) + lastPhysical, lastLogical := tsoutil.ParseTS(startTs) + for i := 0; i < count; i++ { + ts, _ := gTestTsoAllocator.GenerateTSO(perCount) + physical, logical := tsoutil.ParseTS(ts) + if lastPhysical.Equal(physical) { + diff := logical - lastLogical + assert.Equal(t, uint64(perCount), diff) + } + lastPhysical, lastLogical = physical, logical + } + }) +} + func TestGlobalTSOAllocator_All(t *testing.T) { endpoints := os.Getenv("ETCD_ENDPOINTS") if endpoints == "" { @@ -36,6 +73,9 @@ func TestGlobalTSOAllocator_All(t *testing.T) { t.Run("Initialize", func(t *testing.T) { err := gTestTsoAllocator.Initialize() assert.Nil(t, err) + + err = gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) }) t.Run("GenerateTSO", func(t *testing.T) { @@ -45,7 +85,8 @@ func TestGlobalTSOAllocator_All(t *testing.T) { assert.Nil(t, err) lastPhysical, lastLogical := tsoutil.ParseTS(startTs) for i := 0; i < count; i++ { - ts, _ := gTestTsoAllocator.GenerateTSO(perCount) + ts, err2 := gTestTsoAllocator.GenerateTSO(perCount) + assert.Nil(t, err2) physical, logical := tsoutil.ParseTS(ts) if lastPhysical.Equal(physical) { diff := logical - lastLogical @@ -71,15 +112,18 @@ func TestGlobalTSOAllocator_All(t *testing.T) { assert.Equal(t, physical, lastPhysical.Add(time.Millisecond*time.Duration(step))) lastPhysical = physical } + err = gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) }) gTestTsoAllocator.SetLimitMaxLogic(true) t.Run("SetTSO", func(t *testing.T) { - curTime := time.Now() + ts, err2 := gTestTsoAllocator.GenerateTSO(1) + assert.Nil(t, err2) + curTime, logical := tsoutil.ParseTS(ts) nextTime := curTime.Add(2 * time.Second) physical := nextTime.UnixNano() / int64(time.Millisecond) - logical := int64(0) - err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) + err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, int64(logical))) assert.Nil(t, err) }) @@ -165,3 +209,37 @@ func TestGlobalTSOAllocator_Update(t *testing.T) { err = gTestTsoAllocator.UpdateTSO() assert.Nil(t, err) } + +func TestGlobalTSOAllocator_load(t *testing.T) { + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" + } + etcdEndpoints := strings.Split(endpoints, ",") + etcdKV, err := tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "tsoTest") + assert.NoError(t, err) + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV) + err = gTestTsoAllocator.Initialize() + assert.Nil(t, err) + + err = gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) + + ts, _ := gTestTsoAllocator.GenerateTSO(1) + curTime, logical := tsoutil.ParseTS(ts) + nextTime := curTime.Add(2 * time.Second) + physical := nextTime.UnixNano() / int64(time.Millisecond) + target := tsoutil.ComposeTS(physical, int64(logical)) + err = gTestTsoAllocator.SetTSO(target) + assert.Nil(t, err) + + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV) + err = gTestTsoAllocator.Initialize() + assert.Nil(t, err) + + ts2, err2 := gTestTsoAllocator.GenerateTSO(1) + assert.Nil(t, err2) + curTime2, _ := tsoutil.ParseTS(ts2) + assert.True(t, ts2 >= target) + assert.True(t, curTime2.UnixNano() >= nextTime.UnixNano()) +} diff --git a/internal/tso/tso.go b/internal/tso/tso.go index a5aff07544..fbc9ec1822 100644 --- a/internal/tso/tso.go +++ b/internal/tso/tso.go @@ -68,19 +68,19 @@ type timestampOracle struct { lastSavedTime atomic.Value } -//func (t *timestampOracle) loadTimestamp() (time.Time, error) { -// strData, err := t.txnKV.Load(t.key) -// -// var binData []byte = []byte(strData) -// -// if err != nil { -// return typeutil.ZeroTime, err -// } -// if len(binData) == 0 { -// return typeutil.ZeroTime, nil -// } -// return typeutil.ParseTimestamp(binData) -//} +func (t *timestampOracle) loadTimestamp() (time.Time, error) { + strData, err := t.txnKV.Load(t.key) + if err != nil { + // intend to return nil + return typeutil.ZeroTime, nil + } + + var binData = []byte(strData) + if len(binData) == 0 { + return typeutil.ZeroTime, nil + } + return typeutil.ParseTimestamp(binData) +} // save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, // otherwise, update it. @@ -95,24 +95,24 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error { } func (t *timestampOracle) InitTimestamp() error { - //last, err := t.loadTimestamp() - //if err != nil { - // return err - //} + last, err := t.loadTimestamp() + if err != nil { + return err + } next := time.Now() - // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, - // the timestamp allocation will start from the saved etcd timestamp temporarily. - //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - // next = last.Add(updateTimestampGuard) - //} + //If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, + //the timestamp allocation will start from the saved etcd timestamp temporarily. + if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { + next = last.Add(updateTimestampGuard) + } save := next.Add(t.saveInterval) if err := t.saveTimestamp(save); err != nil { return err } - //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) + log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) current := &atomicObject{ physical: next,