diff --git a/internal/allocator/global_id.go b/internal/allocator/global_id.go index e4a569fd95..3fb1f66aa5 100644 --- a/internal/allocator/global_id.go +++ b/internal/allocator/global_id.go @@ -18,8 +18,10 @@ type GlobalIDAllocator struct { } func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { + allocator := tso.NewGlobalTSOAllocator(key, base) + allocator.EnableMaxLogic(false) return &GlobalIDAllocator{ - allocator: tso.NewGlobalTSOAllocator(key, base), + allocator: allocator, } } diff --git a/internal/allocator/global_id_test.go b/internal/allocator/global_id_test.go new file mode 100644 index 0000000000..ed9df4246c --- /dev/null +++ b/internal/allocator/global_id_test.go @@ -0,0 +1,43 @@ +package allocator + +import ( + "os" + "testing" + + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +var gTestIDAllocator *GlobalIDAllocator + +func TestGlobalTSOAllocator_All(t *testing.T) { + etcdAddress := os.Getenv("ETCD_ADDRESS") + if etcdAddress == "" { + ip := funcutil.GetLocalIP() + etcdAddress = ip + ":2379" + } + gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "gidTest")) + + t.Run("Initialize", func(t *testing.T) { + err := gTestIDAllocator.Initialize() + assert.Nil(t, err) + }) + + t.Run("AllocOne", func(t *testing.T) { + one, err := gTestIDAllocator.AllocOne() + assert.Nil(t, err) + ano, err := gTestIDAllocator.AllocOne() + assert.Nil(t, err) + assert.NotEqual(t, one, ano) + }) + + t.Run("Alloc", func(t *testing.T) { + count := uint32(2 << 10) + idStart, idEnd, err := gTestIDAllocator.Alloc(count) + assert.Nil(t, err) + assert.Equal(t, count, uint32(idEnd-idStart)) + }) + +} diff --git a/internal/tso/global_allocator.go b/internal/tso/global_allocator.go index 4a0bae73b9..032f3000ac 100644 --- a/internal/tso/global_allocator.go +++ b/internal/tso/global_allocator.go @@ -45,7 +45,8 @@ type Allocator interface { // GlobalTSOAllocator is the global single point TSO allocator. type GlobalTSOAllocator struct { - tso *timestampOracle + tso *timestampOracle + LimitMaxLogic bool } // NewGlobalTSOAllocator creates a new global TSO allocator. @@ -58,6 +59,7 @@ func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { maxResetTSGap: func() time.Duration { return 3 * time.Second }, key: key, }, + LimitMaxLogic: true, } } @@ -66,6 +68,10 @@ func (gta *GlobalTSOAllocator) Initialize() error { return gta.tso.InitTimestamp() } +func (gta *GlobalTSOAllocator) EnableMaxLogic(enable bool) { + gta.LimitMaxLogic = enable +} + // UpdateTSO is used to update the TSO in memory and the time window in etcd. func (gta *GlobalTSOAllocator) UpdateTSO() error { return gta.tso.UpdateTimestamp() @@ -97,7 +103,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { physical = current.physical.UnixNano() / int64(time.Millisecond) logical = atomic.AddInt64(¤t.logical, int64(count)) - if logical >= maxLogical { + if logical >= maxLogical && gta.LimitMaxLogic { log.Println("logical part outside of max logical interval, please check ntp time", zap.Int("retry-count", i)) time.Sleep(UpdateTimestampStep) diff --git a/internal/tso/global_allocator_test.go b/internal/tso/global_allocator_test.go new file mode 100644 index 0000000000..c7c453184b --- /dev/null +++ b/internal/tso/global_allocator_test.go @@ -0,0 +1,81 @@ +package tso + +import ( + "os" + "testing" + "time" + + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +var gTestTsoAllocator *GlobalTSOAllocator + +func TestGlobalTSOAllocator_All(t *testing.T) { + etcdAddress := os.Getenv("ETCD_ADDRESS") + if etcdAddress == "" { + ip := funcutil.GetLocalIP() + etcdAddress = ip + ":2379" + } + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "tsoTest")) + t.Run("Initialize", func(t *testing.T) { + err := gTestTsoAllocator.Initialize() + assert.Nil(t, err) + }) + + t.Run("GenerateTSO", func(t *testing.T) { + count := 1000 + perCount := uint32(100) + startTs, err := gTestTsoAllocator.GenerateTSO(perCount) + assert.Nil(t, err) + lastPhysical, lastLogical := tsoutil.ParseTS(startTs) + for i := 0; i < count; i++ { + ts, _ := gTestTsoAllocator.GenerateTSO(perCount) + physical, logical := tsoutil.ParseTS(ts) + if lastPhysical.Equal(physical) { + diff := logical - lastLogical + assert.Equal(t, uint64(perCount), diff) + } + lastPhysical, lastLogical = physical, logical + } + }) + + gTestTsoAllocator.EnableMaxLogic(false) + t.Run("GenerateTSO2", func(t *testing.T) { + count := 1000 + maxL := 1 << 18 + startTs, err := gTestTsoAllocator.GenerateTSO(uint32(maxL)) + step := 10 + perCount := uint32(step) << 18 // 10 ms + assert.Nil(t, err) + lastPhysical, lastLogical := tsoutil.ParseTS(startTs) + for i := 0; i < count; i++ { + ts, _ := gTestTsoAllocator.GenerateTSO(perCount) + physical, logical := tsoutil.ParseTS(ts) + assert.Equal(t, logical, lastLogical) + assert.Equal(t, physical, lastPhysical.Add(time.Millisecond*time.Duration(step))) + lastPhysical = physical + } + }) + gTestTsoAllocator.EnableMaxLogic(true) + t.Run("SetTSO", func(t *testing.T) { + curTime := time.Now() + nextTime := curTime.Add(2 * time.Second) + physical := nextTime.UnixNano() / int64(time.Millisecond) + logical := int64(0) + err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) + assert.Nil(t, err) + }) + + t.Run("UpdateTSO", func(t *testing.T) { + err := gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) + }) + + t.Run("Reset", func(t *testing.T) { + gTestTsoAllocator.Reset() + }) + +} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 4b84bc2b31..b8f53c00f8 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -14,7 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" MILVUS_DIR="${ROOT_DIR}/internal/" echo $MILVUS_DIR -go test -race -cover "${MILVUS_DIR}/kv/..." -failfast +go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast