diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index 44c729bfa8..411cc6e6a4 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -45,7 +45,12 @@ type readerTimeSyncCfg struct { ctx context.Context cancel context.CancelFunc } - +/* +layout of timestamp + time ms logic number +/-------46 bit-----------\/------18bit-----\ ++-------------------------+================+ +*/ func toMillisecond(ts *pb.TimeSyncMsg) int { // get Millisecond in second return int(ts.GetTimestamp() >> 18) diff --git a/timesync/readertimesync_test.go b/timesync/readertimesync_test.go index cf0d5da135..ac03907923 100644 --- a/timesync/readertimesync_test.go +++ b/timesync/readertimesync_test.go @@ -30,15 +30,15 @@ func TestAlignTimeSync(t *testing.T) { ts := []*pb.TimeSyncMsg{ { Peer_Id: 1, - Timestamp: 5 << 18, + Timestamp: toTimestamp(5), }, { Peer_Id: 3, - Timestamp: 15 << 18, + Timestamp: toTimestamp(15), }, { Peer_Id: 2, - Timestamp: 20 << 18, + Timestamp: toTimestamp(20), }, } r.alignTimeSync(ts) @@ -61,15 +61,15 @@ func TestAlignTimeSync2(t *testing.T) { ts := []*pb.TimeSyncMsg{ { Peer_Id: 1, - Timestamp: 5 << 18, + Timestamp: toTimestamp(5), }, { Peer_Id: 3, - Timestamp: 150 << 18, + Timestamp: toTimestamp(150), }, { Peer_Id: 2, - Timestamp: 20 << 18, + Timestamp: toTimestamp(20), }, } ts = r.alignTimeSync(ts) @@ -90,23 +90,23 @@ func TestAlignTimeSync3(t *testing.T) { ts := []*pb.TimeSyncMsg{ { Peer_Id: 1, - Timestamp: 5 << 18, + Timestamp: toTimestamp(5), }, { Peer_Id: 1, - Timestamp: 5 << 18, + Timestamp: toTimestamp(5), }, { Peer_Id: 1, - Timestamp: 5 << 18, + Timestamp: toTimestamp(5), }, { Peer_Id: 3, - Timestamp: 15 << 18, + Timestamp: toTimestamp(15), }, { Peer_Id: 2, - Timestamp: 20 << 18, + Timestamp: toTimestamp(20), }, } ts = r.alignTimeSync(ts) @@ -120,6 +120,70 @@ func TestAlignTimeSync3(t *testing.T) { } } +func TestAlignTimeSync4(t *testing.T) { + r := &readerTimeSyncCfg{ + proxyIdList: []int64{1}, + interval: 200, + } + ts := []*pb.TimeSyncMsg{ + { + Peer_Id: 1, + Timestamp: toTimestamp(15), + }, + { + Peer_Id: 1, + Timestamp: toTimestamp(25), + }, + { + Peer_Id: 1, + Timestamp: toTimestamp(35), + }, + } + ts = r.alignTimeSync(ts) + if len(r.proxyIdList) != 1 { + t.Fatalf("proxyIdList should be : 1") + } + if len(ts) != 1 { + t.Fatalf("aligned failed") + } + if getMillisecond(ts[0].Timestamp) != 35 { + t.Fatalf("aligned failed") + } +} + +func TestAlignTimeSync5(t *testing.T) { + r := &readerTimeSyncCfg{ + proxyIdList: []int64{1, 2, 3}, + interval: 200, + } + ts := []*pb.TimeSyncMsg{ + { + Peer_Id: 1, + Timestamp: toTimestamp(5), + }, + { + Peer_Id: 1, + Timestamp: toTimestamp(5), + }, + { + Peer_Id: 1, + Timestamp: toTimestamp(5), + }, + { + Peer_Id: 3, + Timestamp: toTimestamp(15), + }, + { + Peer_Id: 3, + Timestamp: toTimestamp(20), + }, + } + ts = r.alignTimeSync(ts) + if len(ts) != 0 { + t.Fatalf("aligned failed") + } +} + func TestNewReaderTimeSync(t *testing.T) { r, err := NewReaderTimeSync(pulsarAddr, timeSyncTopic, @@ -251,6 +315,7 @@ func TestReaderTimesync(t *testing.T) { r.Start() var tsm1, tsm2 TimeSyncMsg + var totalRecordes int64 = 0 for { if ctx.Err() != nil { break @@ -263,9 +328,11 @@ func TestReaderTimesync(t *testing.T) { } if tsm1.NumRecorders > 0 { + log.Printf("timestamp %d, num records = %d", getMillisecond(tsm1.Timestamp), tsm1.NumRecorders) + totalRecordes += tsm1.NumRecorders for i := int64(0); i < tsm1.NumRecorders; i++ { im := <-r.InsertOrDelete() - log.Printf("%d - %d", im.Timestamp, tsm2.Timestamp) + //log.Printf("%d - %d", getMillisecond(im.Timestamp), getMillisecond(tsm2.Timestamp)) if im.Timestamp < tsm2.Timestamp { t.Fatalf("time sync error , im.Timestamp = %d, tsm2.Timestamp = %d", im.Timestamp, tsm2.Timestamp) } @@ -274,9 +341,21 @@ func TestReaderTimesync(t *testing.T) { } } + log.Printf("total recordes = %d", totalRecordes) + if totalRecordes != 800 { + t.Fatalf("total records should be 800") + } r.Close() } +func getMillisecond(ts uint64) uint64 { + return ts >> 18 +} + +func toTimestamp(ts uint64) uint64 { + return ts << 18 +} + func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration time.Duration, t *testing.T) { p, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: topic}) ticker := time.Tick(interval * time.Millisecond) @@ -285,7 +364,7 @@ func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration t for i := 0; i < numSteps; i++ { <-ticker tm += interval - tsm := pb.TimeSyncMsg{Timestamp: tm << 18, Peer_Id: id} + tsm := pb.TimeSyncMsg{Timestamp: toTimestamp(tm), Peer_Id: id} tb, _ := proto.Marshal(&tsm) if _, err := p.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil { t.Fatalf("send failed tsm id=%d, timestamp=%d, err=%v", tsm.Peer_Id, tsm.Timestamp, err) @@ -302,7 +381,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64 for i := 1; i <= total; i++ { <-ticker timestamp += 10 - msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: timestamp << 18} + msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: toTimestamp(timestamp)} mb, err := proto.Marshal(&msg) if err != nil { t.Fatalf("marshal error %v", err) @@ -323,7 +402,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64 //log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp) if i%20 == 0 { - tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18} + tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: toTimestamp(timestamp)} tb, err := proto.Marshal(&tm) if err != nil { t.Fatalf("marshal error %v", err)