mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add storage benchmark
Signed-off-by: shengjh <1572099106@qq.com>
This commit is contained in:
parent
08a62e3081
commit
5aee3352ce
@ -5,13 +5,13 @@ import (
|
||||
"crypto/md5"
|
||||
"flag"
|
||||
"fmt"
|
||||
minio "github.com/czs007/suvlim/storage/internal/minio"
|
||||
tikv "github.com/czs007/suvlim/storage/internal/tikv"
|
||||
"github.com/czs007/suvlim/storage/pkg/types"
|
||||
"github.com/pivotal-golang/bytefmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
minio "storage/internal/minio"
|
||||
tikv "storage/internal/tikv"
|
||||
"storage/pkg/types"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -32,13 +32,14 @@ var logFile *os.File
|
||||
var store types.Store
|
||||
var wg sync.WaitGroup
|
||||
|
||||
|
||||
func runSet() {
|
||||
for time.Now().Before(endTime) {
|
||||
num := atomic.AddInt32(&keyNum, 1)
|
||||
key := []byte(fmt.Sprint("key", num))
|
||||
for ver := 1; ver <= numVersion; ver++ {
|
||||
atomic.AddInt32(&counter, 1)
|
||||
err := store.Set(context.Background(), key, valueData, uint64(ver))
|
||||
err := store.PutRow(context.Background(), key, valueData,"empty" ,uint64(ver))
|
||||
if err != nil {
|
||||
log.Fatalf("Error setting key %s, %s", key, err.Error())
|
||||
//atomic.AddInt32(&setCount, -1)
|
||||
@ -54,12 +55,14 @@ func runBatchSet() {
|
||||
for time.Now().Before(endTime) {
|
||||
num := atomic.AddInt32(&keyNum, int32(batchOpSize))
|
||||
keys := make([][]byte, batchOpSize)
|
||||
versions := make([]uint64, batchOpSize)
|
||||
batchSuffix := make([]string, batchOpSize)
|
||||
for n := batchOpSize; n > 0; n-- {
|
||||
keys[n-1] = []byte(fmt.Sprint("key", num-int32(n)))
|
||||
}
|
||||
for ver := 1; ver <= numVersion; ver++ {
|
||||
atomic.AddInt32(&counter, 1)
|
||||
err := store.BatchSet(context.Background(), keys, batchValueData, uint64(numVersion))
|
||||
err := store.PutRows(context.Background(), keys, batchValueData, batchSuffix,versions)
|
||||
if err != nil {
|
||||
log.Fatalf("Error setting batch keys %s %s", keys, err.Error())
|
||||
//atomic.AddInt32(&batchSetCount, -1)
|
||||
@ -70,6 +73,7 @@ func runBatchSet() {
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
|
||||
func runGet() {
|
||||
for time.Now().Before(endTime) {
|
||||
num := atomic.AddInt32(&counter, 1)
|
||||
@ -77,7 +81,7 @@ func runGet() {
|
||||
//key := []byte(fmt.Sprint("key", num))
|
||||
num = num % totalKeyCount
|
||||
key := totalKeys[num]
|
||||
_, err := store.Get(context.Background(), key, uint64(numVersion))
|
||||
_, err := store.GetRow(context.Background(), key, uint64(numVersion))
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting key %s, %s", key, err.Error())
|
||||
//atomic.AddInt32(&getCount, -1)
|
||||
@ -101,8 +105,12 @@ func runBatchGet() {
|
||||
}
|
||||
start := end - int32(batchOpSize)
|
||||
keys := totalKeys[start:end]
|
||||
versions := make([]uint64, batchOpSize)
|
||||
for i, _ := range versions{
|
||||
versions[i]= uint64(numVersion)
|
||||
}
|
||||
atomic.AddInt32(&counter, 1)
|
||||
_, err := store.BatchGet(context.Background(), keys, uint64(numVersion))
|
||||
_, err := store.GetRows(context.Background(), keys, versions)
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting key %s, %s", keys, err.Error())
|
||||
//atomic.AddInt32(&batchGetCount, -1)
|
||||
@ -120,7 +128,7 @@ func runDelete() {
|
||||
//key := []byte(fmt.Sprint("key", num))
|
||||
num = num % totalKeyCount
|
||||
key := totalKeys[num]
|
||||
err := store.Delete(context.Background(), key, uint64(numVersion))
|
||||
err := store.DeleteRow(context.Background(), key, uint64(numVersion))
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting key %s, %s", key, err.Error())
|
||||
//atomic.AddInt32(&deleteCount, -1)
|
||||
@ -145,7 +153,11 @@ func runBatchDelete() {
|
||||
start := end - int32(batchOpSize)
|
||||
keys := totalKeys[start:end]
|
||||
atomic.AddInt32(&counter, 1)
|
||||
err := store.BatchDelete(context.Background(), keys, uint64(numVersion))
|
||||
versions := make([]uint64, batchOpSize)
|
||||
for i, _ := range versions{
|
||||
versions[i]= uint64(numVersion)
|
||||
}
|
||||
err := store.DeleteRows(context.Background(), keys, versions)
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting key %s, %s", keys, err.Error())
|
||||
//atomic.AddInt32(&batchDeleteCount, -1)
|
||||
@ -159,15 +171,15 @@ func runBatchDelete() {
|
||||
func main() {
|
||||
// Parse command line
|
||||
myflag := flag.NewFlagSet("myflag", flag.ExitOnError)
|
||||
myflag.IntVar(&durationSecs, "d", 5, "Duration of each test in seconds")
|
||||
myflag.IntVar(&durationSecs, "d", 30, "Duration of each test in seconds")
|
||||
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
||||
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
||||
var sizeArg string
|
||||
var storeType string
|
||||
myflag.StringVar(&sizeArg, "z", "1K", "Size of objects in bytes with postfix K, M, and G")
|
||||
myflag.StringVar(&sizeArg, "z", "2k", "Size of objects in bytes with postfix K, M, and G")
|
||||
myflag.StringVar(&storeType, "s", "tikv", "Storage type, tikv or minio")
|
||||
myflag.IntVar(&numVersion, "v", 1, "Max versions for each key")
|
||||
myflag.IntVar(&batchOpSize, "b", 100, "Batch operation kv pair number")
|
||||
myflag.IntVar(&batchOpSize, "b", 1000, "Batch operation kv pair number")
|
||||
|
||||
if err := myflag.Parse(os.Args[1:]); err != nil {
|
||||
os.Exit(1)
|
||||
@ -189,7 +201,7 @@ func main() {
|
||||
log.Fatalf("Error when creating storage " + err.Error())
|
||||
}
|
||||
case "minio":
|
||||
store, err = minio.NewMinioStore(context.Background())
|
||||
store, err = minio.NewMinioDriver(context.Background())
|
||||
if err != nil {
|
||||
log.Fatalf("Error when creating storage " + err.Error())
|
||||
}
|
||||
@ -228,25 +240,10 @@ func main() {
|
||||
totalKeyCount = 0
|
||||
totalKeys = nil
|
||||
|
||||
// Run the set case
|
||||
startTime := time.Now()
|
||||
endTime = startTime.Add(time.Second * time.Duration(durationSecs))
|
||||
for n := 1; n <= threads; n++ {
|
||||
wg.Add(1)
|
||||
go runSet()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
setTime := setFinish.Sub(startTime).Seconds()
|
||||
|
||||
bps := float64(uint64(counter)*valueSize) / setTime
|
||||
fmt.Fprint(logFile, fmt.Sprintf("Loop %d: PUT time %.1f secs, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
|
||||
loop, setTime, counter, bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter)/setTime))
|
||||
|
||||
// Run the batchSet case
|
||||
// key seq start from setCount
|
||||
counter = 0
|
||||
startTime = time.Now()
|
||||
startTime := time.Now()
|
||||
endTime = startTime.Add(time.Second * time.Duration(durationSecs))
|
||||
for n := 1; n <= threads; n++ {
|
||||
wg.Add(1)
|
||||
@ -254,8 +251,8 @@ func main() {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
setTime = setFinish.Sub(startTime).Seconds()
|
||||
bps = float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime
|
||||
setTime := setFinish.Sub(startTime).Seconds()
|
||||
bps := float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime
|
||||
fmt.Fprint(logFile, fmt.Sprintf("Loop %d: BATCH PUT time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
|
||||
loop, setTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter * int32(batchOpSize))/setTime))
|
||||
|
||||
@ -329,12 +326,6 @@ func main() {
|
||||
// Print line mark
|
||||
lineMark := "\n"
|
||||
fmt.Fprint(logFile, lineMark)
|
||||
|
||||
// Clear test data
|
||||
err = store.BatchDelete(context.Background(), totalKeys, uint64(numVersion))
|
||||
if err != nil {
|
||||
log.Print("Clean test data error " + err.Error())
|
||||
}
|
||||
}
|
||||
log.Print("Benchmark test done.")
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user