diff --git a/cmd/storage/benchmark.go b/storage/benchmark/benchmark.go similarity index 84% rename from cmd/storage/benchmark.go rename to storage/benchmark/benchmark.go index f55d966ce9..9760fd628b 100644 --- a/cmd/storage/benchmark.go +++ b/storage/benchmark/benchmark.go @@ -5,13 +5,13 @@ import ( "crypto/md5" "flag" "fmt" + minio "github.com/czs007/suvlim/storage/internal/minio" + tikv "github.com/czs007/suvlim/storage/internal/tikv" + "github.com/czs007/suvlim/storage/pkg/types" "github.com/pivotal-golang/bytefmt" "log" "math/rand" "os" - minio "storage/internal/minio" - tikv "storage/internal/tikv" - "storage/pkg/types" "sync" "sync/atomic" "time" @@ -32,13 +32,14 @@ var logFile *os.File var store types.Store var wg sync.WaitGroup + func runSet() { for time.Now().Before(endTime) { num := atomic.AddInt32(&keyNum, 1) key := []byte(fmt.Sprint("key", num)) for ver := 1; ver <= numVersion; ver++ { atomic.AddInt32(&counter, 1) - err := store.Set(context.Background(), key, valueData, uint64(ver)) + err := store.PutRow(context.Background(), key, valueData,"empty" ,uint64(ver)) if err != nil { log.Fatalf("Error setting key %s, %s", key, err.Error()) //atomic.AddInt32(&setCount, -1) @@ -54,12 +55,14 @@ func runBatchSet() { for time.Now().Before(endTime) { num := atomic.AddInt32(&keyNum, int32(batchOpSize)) keys := make([][]byte, batchOpSize) + versions := make([]uint64, batchOpSize) + batchSuffix := make([]string, batchOpSize) for n := batchOpSize; n > 0; n-- { keys[n-1] = []byte(fmt.Sprint("key", num-int32(n))) } for ver := 1; ver <= numVersion; ver++ { atomic.AddInt32(&counter, 1) - err := store.BatchSet(context.Background(), keys, batchValueData, uint64(numVersion)) + err := store.PutRows(context.Background(), keys, batchValueData, batchSuffix,versions) if err != nil { log.Fatalf("Error setting batch keys %s %s", keys, err.Error()) //atomic.AddInt32(&batchSetCount, -1) @@ -70,6 +73,7 @@ func runBatchSet() { wg.Done() } + func runGet() { for time.Now().Before(endTime) { num := atomic.AddInt32(&counter, 1) @@ -77,7 +81,7 @@ func runGet() { //key := []byte(fmt.Sprint("key", num)) num = num % totalKeyCount key := totalKeys[num] - _, err := store.Get(context.Background(), key, uint64(numVersion)) + _, err := store.GetRow(context.Background(), key, uint64(numVersion)) if err != nil { log.Fatalf("Error getting key %s, %s", key, err.Error()) //atomic.AddInt32(&getCount, -1) @@ -101,8 +105,12 @@ func runBatchGet() { } start := end - int32(batchOpSize) keys := totalKeys[start:end] + versions := make([]uint64, batchOpSize) + for i, _ := range versions{ + versions[i]= uint64(numVersion) + } atomic.AddInt32(&counter, 1) - _, err := store.BatchGet(context.Background(), keys, uint64(numVersion)) + _, err := store.GetRows(context.Background(), keys, versions) if err != nil { log.Fatalf("Error getting key %s, %s", keys, err.Error()) //atomic.AddInt32(&batchGetCount, -1) @@ -120,7 +128,7 @@ func runDelete() { //key := []byte(fmt.Sprint("key", num)) num = num % totalKeyCount key := totalKeys[num] - err := store.Delete(context.Background(), key, uint64(numVersion)) + err := store.DeleteRow(context.Background(), key, uint64(numVersion)) if err != nil { log.Fatalf("Error getting key %s, %s", key, err.Error()) //atomic.AddInt32(&deleteCount, -1) @@ -145,7 +153,11 @@ func runBatchDelete() { start := end - int32(batchOpSize) keys := totalKeys[start:end] atomic.AddInt32(&counter, 1) - err := store.BatchDelete(context.Background(), keys, uint64(numVersion)) + versions := make([]uint64, batchOpSize) + for i, _ := range versions{ + versions[i]= uint64(numVersion) + } + err := store.DeleteRows(context.Background(), keys, versions) if err != nil { log.Fatalf("Error getting key %s, %s", keys, err.Error()) //atomic.AddInt32(&batchDeleteCount, -1) @@ -159,15 +171,15 @@ func runBatchDelete() { func main() { // Parse command line myflag := flag.NewFlagSet("myflag", flag.ExitOnError) - myflag.IntVar(&durationSecs, "d", 5, "Duration of each test in seconds") + myflag.IntVar(&durationSecs, "d", 30, "Duration of each test in seconds") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") var sizeArg string var storeType string - myflag.StringVar(&sizeArg, "z", "1K", "Size of objects in bytes with postfix K, M, and G") + myflag.StringVar(&sizeArg, "z", "2k", "Size of objects in bytes with postfix K, M, and G") myflag.StringVar(&storeType, "s", "tikv", "Storage type, tikv or minio") myflag.IntVar(&numVersion, "v", 1, "Max versions for each key") - myflag.IntVar(&batchOpSize, "b", 100, "Batch operation kv pair number") + myflag.IntVar(&batchOpSize, "b", 1000, "Batch operation kv pair number") if err := myflag.Parse(os.Args[1:]); err != nil { os.Exit(1) @@ -189,7 +201,7 @@ func main() { log.Fatalf("Error when creating storage " + err.Error()) } case "minio": - store, err = minio.NewMinioStore(context.Background()) + store, err = minio.NewMinioDriver(context.Background()) if err != nil { log.Fatalf("Error when creating storage " + err.Error()) } @@ -228,25 +240,10 @@ func main() { totalKeyCount = 0 totalKeys = nil - // Run the set case - startTime := time.Now() - endTime = startTime.Add(time.Second * time.Duration(durationSecs)) - for n := 1; n <= threads; n++ { - wg.Add(1) - go runSet() - } - wg.Wait() - - setTime := setFinish.Sub(startTime).Seconds() - - bps := float64(uint64(counter)*valueSize) / setTime - fmt.Fprint(logFile, fmt.Sprintf("Loop %d: PUT time %.1f secs, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n", - loop, setTime, counter, bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter)/setTime)) - // Run the batchSet case // key seq start from setCount counter = 0 - startTime = time.Now() + startTime := time.Now() endTime = startTime.Add(time.Second * time.Duration(durationSecs)) for n := 1; n <= threads; n++ { wg.Add(1) @@ -254,8 +251,8 @@ func main() { } wg.Wait() - setTime = setFinish.Sub(startTime).Seconds() - bps = float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime + setTime := setFinish.Sub(startTime).Seconds() + bps := float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime fmt.Fprint(logFile, fmt.Sprintf("Loop %d: BATCH PUT time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n", loop, setTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter * int32(batchOpSize))/setTime)) @@ -329,12 +326,6 @@ func main() { // Print line mark lineMark := "\n" fmt.Fprint(logFile, lineMark) - - // Clear test data - err = store.BatchDelete(context.Background(), totalKeys, uint64(numVersion)) - if err != nil { - log.Print("Clean test data error " + err.Error()) - } } log.Print("Benchmark test done.") }