Add unittest for time sync

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2020-09-09 11:27:39 +08:00 committed by yefu.chen
parent 1573209846
commit 0431d0c682
2 changed files with 100 additions and 16 deletions

View File

@ -45,7 +45,12 @@ type readerTimeSyncCfg struct {
ctx context.Context
cancel context.CancelFunc
}
/*
layout of timestamp
time ms logic number
/-------46 bit-----------\/------18bit-----\
+-------------------------+================+
*/
func toMillisecond(ts *pb.TimeSyncMsg) int {
// get Millisecond in second
return int(ts.GetTimestamp() >> 18)

View File

@ -30,15 +30,15 @@ func TestAlignTimeSync(t *testing.T) {
ts := []*pb.TimeSyncMsg{
{
Peer_Id: 1,
Timestamp: 5 << 18,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 3,
Timestamp: 15 << 18,
Timestamp: toTimestamp(15),
},
{
Peer_Id: 2,
Timestamp: 20 << 18,
Timestamp: toTimestamp(20),
},
}
r.alignTimeSync(ts)
@ -61,15 +61,15 @@ func TestAlignTimeSync2(t *testing.T) {
ts := []*pb.TimeSyncMsg{
{
Peer_Id: 1,
Timestamp: 5 << 18,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 3,
Timestamp: 150 << 18,
Timestamp: toTimestamp(150),
},
{
Peer_Id: 2,
Timestamp: 20 << 18,
Timestamp: toTimestamp(20),
},
}
ts = r.alignTimeSync(ts)
@ -90,23 +90,23 @@ func TestAlignTimeSync3(t *testing.T) {
ts := []*pb.TimeSyncMsg{
{
Peer_Id: 1,
Timestamp: 5 << 18,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 1,
Timestamp: 5 << 18,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 1,
Timestamp: 5 << 18,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 3,
Timestamp: 15 << 18,
Timestamp: toTimestamp(15),
},
{
Peer_Id: 2,
Timestamp: 20 << 18,
Timestamp: toTimestamp(20),
},
}
ts = r.alignTimeSync(ts)
@ -120,6 +120,70 @@ func TestAlignTimeSync3(t *testing.T) {
}
}
func TestAlignTimeSync4(t *testing.T) {
r := &readerTimeSyncCfg{
proxyIdList: []int64{1},
interval: 200,
}
ts := []*pb.TimeSyncMsg{
{
Peer_Id: 1,
Timestamp: toTimestamp(15),
},
{
Peer_Id: 1,
Timestamp: toTimestamp(25),
},
{
Peer_Id: 1,
Timestamp: toTimestamp(35),
},
}
ts = r.alignTimeSync(ts)
if len(r.proxyIdList) != 1 {
t.Fatalf("proxyIdList should be : 1")
}
if len(ts) != 1 {
t.Fatalf("aligned failed")
}
if getMillisecond(ts[0].Timestamp) != 35 {
t.Fatalf("aligned failed")
}
}
func TestAlignTimeSync5(t *testing.T) {
r := &readerTimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
interval: 200,
}
ts := []*pb.TimeSyncMsg{
{
Peer_Id: 1,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 1,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 1,
Timestamp: toTimestamp(5),
},
{
Peer_Id: 3,
Timestamp: toTimestamp(15),
},
{
Peer_Id: 3,
Timestamp: toTimestamp(20),
},
}
ts = r.alignTimeSync(ts)
if len(ts) != 0 {
t.Fatalf("aligned failed")
}
}
func TestNewReaderTimeSync(t *testing.T) {
r, err := NewReaderTimeSync(pulsarAddr,
timeSyncTopic,
@ -251,6 +315,7 @@ func TestReaderTimesync(t *testing.T) {
r.Start()
var tsm1, tsm2 TimeSyncMsg
var totalRecordes int64 = 0
for {
if ctx.Err() != nil {
break
@ -263,9 +328,11 @@ func TestReaderTimesync(t *testing.T) {
}
if tsm1.NumRecorders > 0 {
log.Printf("timestamp %d, num records = %d", getMillisecond(tsm1.Timestamp), tsm1.NumRecorders)
totalRecordes += tsm1.NumRecorders
for i := int64(0); i < tsm1.NumRecorders; i++ {
im := <-r.InsertOrDelete()
log.Printf("%d - %d", im.Timestamp, tsm2.Timestamp)
//log.Printf("%d - %d", getMillisecond(im.Timestamp), getMillisecond(tsm2.Timestamp))
if im.Timestamp < tsm2.Timestamp {
t.Fatalf("time sync error , im.Timestamp = %d, tsm2.Timestamp = %d", im.Timestamp, tsm2.Timestamp)
}
@ -274,9 +341,21 @@ func TestReaderTimesync(t *testing.T) {
}
}
log.Printf("total recordes = %d", totalRecordes)
if totalRecordes != 800 {
t.Fatalf("total records should be 800")
}
r.Close()
}
func getMillisecond(ts uint64) uint64 {
return ts >> 18
}
func toTimestamp(ts uint64) uint64 {
return ts << 18
}
func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration time.Duration, t *testing.T) {
p, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: topic})
ticker := time.Tick(interval * time.Millisecond)
@ -285,7 +364,7 @@ func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration t
for i := 0; i < numSteps; i++ {
<-ticker
tm += interval
tsm := pb.TimeSyncMsg{Timestamp: tm << 18, Peer_Id: id}
tsm := pb.TimeSyncMsg{Timestamp: toTimestamp(tm), Peer_Id: id}
tb, _ := proto.Marshal(&tsm)
if _, err := p.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil {
t.Fatalf("send failed tsm id=%d, timestamp=%d, err=%v", tsm.Peer_Id, tsm.Timestamp, err)
@ -302,7 +381,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64
for i := 1; i <= total; i++ {
<-ticker
timestamp += 10
msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: timestamp << 18}
msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: toTimestamp(timestamp)}
mb, err := proto.Marshal(&msg)
if err != nil {
t.Fatalf("marshal error %v", err)
@ -323,7 +402,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64
//log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp)
if i%20 == 0 {
tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18}
tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: toTimestamp(timestamp)}
tb, err := proto.Marshal(&tm)
if err != nil {
t.Fatalf("marshal error %v", err)