mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fixbug: load last saved timestamp when rootcoord restart (#8137)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
dbcacec0a7
commit
7c4aff0ea2
@ -24,6 +24,43 @@ import (
|
||||
|
||||
var gTestTsoAllocator *GlobalTSOAllocator
|
||||
|
||||
func TestGlobalTSOAllocator_Initialize(t *testing.T) {
|
||||
endpoints := os.Getenv("ETCD_ENDPOINTS")
|
||||
if endpoints == "" {
|
||||
endpoints = "localhost:2379"
|
||||
}
|
||||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
etcdKV, err := tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "tsoTest")
|
||||
assert.NoError(t, err)
|
||||
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV)
|
||||
err = gTestTsoAllocator.Initialize()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = gTestTsoAllocator.UpdateTSO()
|
||||
assert.Nil(t, err)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
err = gTestTsoAllocator.Initialize()
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("GenerateTSO", func(t *testing.T) {
|
||||
count := 1000
|
||||
perCount := uint32(100)
|
||||
startTs, err := gTestTsoAllocator.GenerateTSO(perCount)
|
||||
assert.Nil(t, err)
|
||||
lastPhysical, lastLogical := tsoutil.ParseTS(startTs)
|
||||
for i := 0; i < count; i++ {
|
||||
ts, _ := gTestTsoAllocator.GenerateTSO(perCount)
|
||||
physical, logical := tsoutil.ParseTS(ts)
|
||||
if lastPhysical.Equal(physical) {
|
||||
diff := logical - lastLogical
|
||||
assert.Equal(t, uint64(perCount), diff)
|
||||
}
|
||||
lastPhysical, lastLogical = physical, logical
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGlobalTSOAllocator_All(t *testing.T) {
|
||||
endpoints := os.Getenv("ETCD_ENDPOINTS")
|
||||
if endpoints == "" {
|
||||
@ -36,6 +73,9 @@ func TestGlobalTSOAllocator_All(t *testing.T) {
|
||||
t.Run("Initialize", func(t *testing.T) {
|
||||
err := gTestTsoAllocator.Initialize()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = gTestTsoAllocator.UpdateTSO()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("GenerateTSO", func(t *testing.T) {
|
||||
@ -45,7 +85,8 @@ func TestGlobalTSOAllocator_All(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
lastPhysical, lastLogical := tsoutil.ParseTS(startTs)
|
||||
for i := 0; i < count; i++ {
|
||||
ts, _ := gTestTsoAllocator.GenerateTSO(perCount)
|
||||
ts, err2 := gTestTsoAllocator.GenerateTSO(perCount)
|
||||
assert.Nil(t, err2)
|
||||
physical, logical := tsoutil.ParseTS(ts)
|
||||
if lastPhysical.Equal(physical) {
|
||||
diff := logical - lastLogical
|
||||
@ -71,15 +112,18 @@ func TestGlobalTSOAllocator_All(t *testing.T) {
|
||||
assert.Equal(t, physical, lastPhysical.Add(time.Millisecond*time.Duration(step)))
|
||||
lastPhysical = physical
|
||||
}
|
||||
err = gTestTsoAllocator.UpdateTSO()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
gTestTsoAllocator.SetLimitMaxLogic(true)
|
||||
t.Run("SetTSO", func(t *testing.T) {
|
||||
curTime := time.Now()
|
||||
ts, err2 := gTestTsoAllocator.GenerateTSO(1)
|
||||
assert.Nil(t, err2)
|
||||
curTime, logical := tsoutil.ParseTS(ts)
|
||||
nextTime := curTime.Add(2 * time.Second)
|
||||
physical := nextTime.UnixNano() / int64(time.Millisecond)
|
||||
logical := int64(0)
|
||||
err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical))
|
||||
err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, int64(logical)))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
@ -165,3 +209,37 @@ func TestGlobalTSOAllocator_Update(t *testing.T) {
|
||||
err = gTestTsoAllocator.UpdateTSO()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGlobalTSOAllocator_load(t *testing.T) {
|
||||
endpoints := os.Getenv("ETCD_ENDPOINTS")
|
||||
if endpoints == "" {
|
||||
endpoints = "localhost:2379"
|
||||
}
|
||||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
etcdKV, err := tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "tsoTest")
|
||||
assert.NoError(t, err)
|
||||
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV)
|
||||
err = gTestTsoAllocator.Initialize()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = gTestTsoAllocator.UpdateTSO()
|
||||
assert.Nil(t, err)
|
||||
|
||||
ts, _ := gTestTsoAllocator.GenerateTSO(1)
|
||||
curTime, logical := tsoutil.ParseTS(ts)
|
||||
nextTime := curTime.Add(2 * time.Second)
|
||||
physical := nextTime.UnixNano() / int64(time.Millisecond)
|
||||
target := tsoutil.ComposeTS(physical, int64(logical))
|
||||
err = gTestTsoAllocator.SetTSO(target)
|
||||
assert.Nil(t, err)
|
||||
|
||||
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", etcdKV)
|
||||
err = gTestTsoAllocator.Initialize()
|
||||
assert.Nil(t, err)
|
||||
|
||||
ts2, err2 := gTestTsoAllocator.GenerateTSO(1)
|
||||
assert.Nil(t, err2)
|
||||
curTime2, _ := tsoutil.ParseTS(ts2)
|
||||
assert.True(t, ts2 >= target)
|
||||
assert.True(t, curTime2.UnixNano() >= nextTime.UnixNano())
|
||||
}
|
||||
|
||||
@ -68,19 +68,19 @@ type timestampOracle struct {
|
||||
lastSavedTime atomic.Value
|
||||
}
|
||||
|
||||
//func (t *timestampOracle) loadTimestamp() (time.Time, error) {
|
||||
// strData, err := t.txnKV.Load(t.key)
|
||||
//
|
||||
// var binData []byte = []byte(strData)
|
||||
//
|
||||
// if err != nil {
|
||||
// return typeutil.ZeroTime, err
|
||||
// }
|
||||
// if len(binData) == 0 {
|
||||
// return typeutil.ZeroTime, nil
|
||||
// }
|
||||
// return typeutil.ParseTimestamp(binData)
|
||||
//}
|
||||
func (t *timestampOracle) loadTimestamp() (time.Time, error) {
|
||||
strData, err := t.txnKV.Load(t.key)
|
||||
if err != nil {
|
||||
// intend to return nil
|
||||
return typeutil.ZeroTime, nil
|
||||
}
|
||||
|
||||
var binData = []byte(strData)
|
||||
if len(binData) == 0 {
|
||||
return typeutil.ZeroTime, nil
|
||||
}
|
||||
return typeutil.ParseTimestamp(binData)
|
||||
}
|
||||
|
||||
// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
|
||||
// otherwise, update it.
|
||||
@ -95,24 +95,24 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error {
|
||||
}
|
||||
|
||||
func (t *timestampOracle) InitTimestamp() error {
|
||||
//last, err := t.loadTimestamp()
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
last, err := t.loadTimestamp()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
next := time.Now()
|
||||
|
||||
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
|
||||
// the timestamp allocation will start from the saved etcd timestamp temporarily.
|
||||
//if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
|
||||
// next = last.Add(updateTimestampGuard)
|
||||
//}
|
||||
//If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
|
||||
//the timestamp allocation will start from the saved etcd timestamp temporarily.
|
||||
if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
|
||||
next = last.Add(updateTimestampGuard)
|
||||
}
|
||||
|
||||
save := next.Add(t.saveInterval)
|
||||
if err := t.saveTimestamp(save); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
|
||||
log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
|
||||
|
||||
current := &atomicObject{
|
||||
physical: next,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user