diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index be235bd1d1..0aabeff6f0 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -164,8 +164,11 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes msgCounter := MsgCounter{ InsertCounter: 0, + InsertTime: time.Now(), DeleteCounter: 0, + DeleteTime: time.Now(), SearchCounter: 0, + SearchTime: time.Now(), } return &QueryNode{ @@ -263,7 +266,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { const Debug = true const CountInsertMsgBaseline = 1000 * 1000 var BaselineCounter int64 = 0 - node.msgCounter.InsertTime = time.Now() if Debug { for { diff --git a/reader/read_node/util_functions.go b/reader/read_node/util_functions.go index 303d824d90..40c2c456c6 100644 --- a/reader/read_node/util_functions.go +++ b/reader/read_node/util_functions.go @@ -93,7 +93,7 @@ func (node *QueryNode) QueryLog(length int) { } func (node *QueryNode) WriteQueryLog() { - f, err := os.OpenFile("/tmp/query_node.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile("/tmp/query_node_insert.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Fatal(err) } diff --git a/sdk/examples/common/TestParameter.h b/sdk/examples/common/TestParameter.h index aceafee0a3..85d19a6f80 100644 --- a/sdk/examples/common/TestParameter.h +++ b/sdk/examples/common/TestParameter.h @@ -25,16 +25,12 @@ struct TestParameters { std::string port_; std::string collection_name_; - // collection parameters, only works when collection_name_ is empty - int64_t index_type_ = (int64_t)milvus::IndexType::IVFSQ8; // sq8 - int64_t nlist_ = 16384; - int64_t metric_type_ = (int64_t)milvus::MetricType::L2; // L2 + int64_t id_start_ = -1; + int64_t id_count_ = 0; + int64_t loop_ = 0; // query parameters - int64_t query_count_ = 1000; - int64_t nq_ = 1; int64_t topk_ = 10; - int64_t nprobe_ = 16; bool is_valid = true; }; diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 4c4a0df701..e5c19fb90c 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -21,9 +21,10 @@ #include "utils/TimeRecorder.h" #include -const int N = 6000000; -const int DIM = 128; -const int LOOP = 2000; +int N = 6000000; +int DIM = 128; +int LOOP = 2000; + int ID_START = 0; std::default_random_engine eng(42); @@ -115,6 +116,7 @@ bool check_schema(const milvus::Mapping & map){ return true; } + int main(int argc, char* argv[]) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); @@ -128,6 +130,33 @@ main(int argc, char* argv[]) { return 0; } + + if (parameters.id_start_ < 0){ + std::cout<< "id_start should >= 0 !" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + if (parameters.id_count_ <= 0){ + std::cout<< "id_count should > 0 !" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + if (parameters.loop_ <= 0){ + std::cout<< "loop should > 0 !" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + N = parameters.id_count_; + ID_START = parameters.id_start_; + LOOP = parameters.loop_; + + std::cout<<"N: " << N << std::endl; + std::cout<<"ID_START: " << ID_START << std::endl; + std::cout<<"LOOP: " << LOOP << std::endl; + const std::string collection_name = parameters.collection_name_; auto client = milvus::ConnectionImpl(); @@ -149,18 +178,26 @@ main(int argc, char* argv[]) { int per_count = N / LOOP; int failed_count = 0; + std::cout<<"PER_COUNT: " << per_count << std::endl; + milvus_sdk::TimeRecorder insert_timer("insert"); - for (int64_t i = 0; i < LOOP; i++) { - std::vector ids_array; - generate_ids(ids_array, per_count); - auto data = GetData(per_count); + for (int64_t i = 0, j=0; j < N;) { + i=j; + j += per_count; + if( j > N ) j = N; + + std::vector ids_array; + generate_ids(ids_array, j - i); + auto data = GetData(j - i); insert_timer.Start(); auto status = client.Insert(collection_name, "default", data, ids_array); if (!status.ok()){ failed_count += 1; } insert_timer.End(); + } + if (failed_count > 0) { std::cout <<" test done, failed_count is :" << failed_count<< std::endl; } diff --git a/sdk/examples/simple/search.cpp b/sdk/examples/simple/search.cpp index 362a9fa521..ec88dba7fc 100644 --- a/sdk/examples/simple/search.cpp +++ b/sdk/examples/simple/search.cpp @@ -19,7 +19,9 @@ const int TOP_K = 10; const int LOOP = 1000; + const int DIM = 128; + std::default_random_engine eng(42); const milvus::VectorParam diff --git a/sdk/utils/Utils.cpp b/sdk/utils/Utils.cpp index bc36fa83d8..fe700f3045 100644 --- a/sdk/utils/Utils.cpp +++ b/sdk/utils/Utils.cpp @@ -38,17 +38,11 @@ print_help(const std::string& app_name) { printf(" -t --collection_name target collection name, specify this will ignore collection parameters, " "default empty\n"); printf(" -h --help Print help information\n"); - printf(" -i --index " - "Collection index type(1=IDMAP, 2=IVFLAT, 3=IVFSQ8, 5=IVFSQ8H), default:3\n"); - printf(" -l --nlist Collection index nlist, default:16384\n"); - printf(" -m --metric " - "Collection metric type(1=L2, 2=IP, 3=HAMMING, 4=JACCARD, 5=TANIMOTO, 6=SUBSTRUCTURE, 7=SUPERSTRUCTURE), " - "default:1\n"); - printf(" -q --query_count Query total count, default:1000\n"); - printf(" -n --nq nq of each query, default:1\n"); + printf(" -i --id_start " + "id_start, default:-1\n"); + printf(" -c --count id count, default:0\n"); + printf(" -l --loop loop, default:0\n"); printf(" -k --topk topk of each query, default:10\n"); - printf(" -b --nprobe nprobe of each query, default:16\n"); - printf(" -v --print_result Print query result, default:false\n"); printf("\n"); } @@ -482,13 +476,10 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's {"port", optional_argument, nullptr, 'p'}, {"help", no_argument, nullptr, 'h'}, {"collection_name", no_argument, nullptr, 't'}, - {"index", optional_argument, nullptr, 'i'}, - {"nlist", optional_argument, nullptr, 'l'}, - {"metric", optional_argument, nullptr, 'm'}, - {"query_count", optional_argument, nullptr, 'q'}, - {"nq", optional_argument, nullptr, 'n'}, + {"id_start", optional_argument, nullptr, 'i'}, + {"count", optional_argument, nullptr, 'c'}, + {"loop", optional_argument, nullptr, 'l'}, {"topk", optional_argument, nullptr, 'k'}, - {"nprobe", optional_argument, nullptr, 'b'}, {nullptr, 0, nullptr, 0}}; int option_index = 0; @@ -496,7 +487,7 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's TestParameters parameters; int value; - while ((value = getopt_long(argc, argv, "s:p:t:i:f:l:m:d:r:c:q:n:k:b:vh", long_options, &option_index)) != -1) { + while ((value = getopt_long(argc, argv, "s:p:t:i:l:c:k:h", long_options, &option_index)) != -1) { switch (value) { case 's': { char* address_ptr = strdup(optarg); @@ -518,46 +509,29 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's } case 'i': { char* ptr = strdup(optarg); - parameters.index_type_ = atol(ptr); + parameters.id_start_ = atol(ptr); + free(ptr); + break; + } + case 'c': { + char* ptr = strdup(optarg); + parameters.id_count_ = atol(ptr); free(ptr); break; } case 'l': { char* ptr = strdup(optarg); - parameters.nlist_ = atol(ptr); - free(ptr); - break; - } - case 'm': { - char* ptr = strdup(optarg); - parameters.metric_type_ = atol(ptr); - free(ptr); - break; - } - case 'q': { - char* ptr = strdup(optarg); - parameters.query_count_ = atol(ptr); - free(ptr); - break; - } - case 'n': { - char* ptr = strdup(optarg); - parameters.nq_ = atol(ptr); + parameters.loop_ = atol(ptr); free(ptr); break; } + case 'k': { char* ptr = strdup(optarg); parameters.topk_ = atol(ptr); free(ptr); break; } - case 'b': { - char* ptr = strdup(optarg); - parameters.nprobe_ = atol(ptr); - free(ptr); - break; - } case 'h': default: print_help(app_name); diff --git a/writer/main.go b/writer/main.go index b17b18e122..6405ecdf23 100644 --- a/writer/main.go +++ b/writer/main.go @@ -8,7 +8,6 @@ import ( "github.com/czs007/suvlim/writer/message_client" "github.com/czs007/suvlim/writer/write_node" "log" - "os" "strconv" "time" ) @@ -49,16 +48,19 @@ func main() { const CountMsgNum = 10000 * 10 if Debug { - var shouldBenchmark = false - var start time.Time - var LogRecords int64 - var logFlag int64 - var logString = "" - logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777) - defer logFile.Close() - if err != nil { - log.Fatalf("Prepare log file error, " + err.Error()) - } + //var shouldBenchmark = false + //var start time.Time + //var LogRecords int64 + //var logFlag int64 + //var logString = "" + //logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777) + //defer logFile.Close() + //if err != nil { + // log.Fatalf("Prepare log file error, " + err.Error()) + //} + + const CountInsertMsgBaseline = 1000 * 1000 + var BaselineCounter int64 = 0 for { if ctx.Err() != nil { @@ -66,35 +68,42 @@ func main() { } msgLength := wn.MessageClient.PrepareBatchMsg() // wait until first 100,000 rows are successfully wrote - if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false { - shouldBenchmark = true - wn.MsgCounter.InsertCounter = 0 - wn.MsgCounter.InsertedRecordSize = 0 - start = time.Now() - } + //if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false { + // shouldBenchmark = true + // wn.MsgCounter.InsertCounter = 0 + // wn.MsgCounter.InsertedRecordSize = 0 + // start = time.Now() + //} + if msgLength > 0 { wn.DoWriteNode(ctx) fmt.Println("write node do a batch message, storage len: ", msgLength) } + + if wn.MsgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter { + wn.WriteWriterLog() + BaselineCounter++ + } + // Test insert time // ignore if less than 1000 records per time interval - if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 { - LogRecords += msgCounter.InsertCounter - timeSince := time.Since(start) - if timeSince >= timeInterval { - speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB - logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============") - newFlag := LogRecords / (10000 * 100) - if newFlag != logFlag { - logFlag = newFlag - fmt.Fprintln(logFile, logString) - logString = "" - } - wn.MsgCounter.InsertCounter = 0 - wn.MsgCounter.InsertedRecordSize = 0 - start = time.Now() - } - } + //if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 { + // LogRecords += msgCounter.InsertCounter + // timeSince := time.Since(start) + // if timeSince >= timeInterval { + // speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB + // logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============") + // newFlag := LogRecords / (10000 * 100) + // if newFlag != logFlag { + // logFlag = newFlag + // fmt.Fprintln(logFile, logString) + // logString = "" + // } + // wn.MsgCounter.InsertCounter = 0 + // wn.MsgCounter.InsertedRecordSize = 0 + // start = time.Now() + // } + //} } } diff --git a/writer/write_node/writer_node.go b/writer/write_node/writer_node.go index 6458db8eb1..f0ed100e6e 100644 --- a/writer/write_node/writer_node.go +++ b/writer/write_node/writer_node.go @@ -2,14 +2,18 @@ package write_node import ( "context" + "encoding/json" "fmt" "github.com/czs007/suvlim/conf" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" storage "github.com/czs007/suvlim/storage/pkg" "github.com/czs007/suvlim/storage/pkg/types" "github.com/czs007/suvlim/writer/message_client" + "log" + "os" "strconv" "sync" + "time" ) type SegmentIdInfo struct { @@ -20,8 +24,19 @@ type SegmentIdInfo struct { type MsgCounter struct { InsertCounter int64 - InsertedRecordSize float64 + InsertTime time.Time + // InsertedRecordSize float64 + DeleteCounter int64 + DeleteTime time.Time +} + +type InsertLog struct { + MsgLength int + DurationInMilliseconds int64 + InsertTime time.Time + NumSince int64 + Speed float64 } type WriteNode struct { @@ -29,6 +44,7 @@ type WriteNode struct { MessageClient *message_client.MessageClient TimeSync uint64 MsgCounter *MsgCounter + InsertLogs []InsertLog } func (wn *WriteNode) Close() { @@ -44,8 +60,10 @@ func NewWriteNode(ctx context.Context, msgCounter := MsgCounter{ InsertCounter: 0, + InsertTime: time.Now(), DeleteCounter: 0, - InsertedRecordSize: 0, + DeleteTime: time.Now(), + // InsertedRecordSize: 0, } return &WriteNode{ @@ -53,6 +71,7 @@ func NewWriteNode(ctx context.Context, MessageClient: &mc, TimeSync: timeSync, MsgCounter: &msgCounter, + InsertLogs: make([]InsertLog, 0), }, err } @@ -73,11 +92,13 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr timeStamp = append(timeStamp, uint64(data[i].Timestamp)) } - wn.MsgCounter.InsertCounter += int64(len(timeStamp)) - if len(timeStamp) > 0 { - // assume each record is same size - wn.MsgCounter.InsertedRecordSize += float64(len(timeStamp) * len(data[0].RowsData.Blob)) - } + wn.WriterLog(len(timeStamp)) + + //wn.MsgCounter.InsertCounter += int64(len(timeStamp)) + //if len(timeStamp) > 0 { + // // assume each record is same size + // wn.MsgCounter.InsertedRecordSize += float64(len(timeStamp) * len(data[0].RowsData.Blob)) + //} error := (*wn.KvStore).PutRows(ctx, prefixKeys, binaryData, suffixKeys, timeStamp) if error != nil { fmt.Println("Can't insert data!") @@ -134,14 +155,14 @@ func (wn *WriteNode) DoWriteNode(ctx context.Context) { numInsertData := len(wn.MessageClient.InsertMsg) numGoRoute := conf.Config.Writer.Parallelism batchSize := numInsertData / numGoRoute - if numInsertData % numGoRoute != 0 { + if numInsertData%numGoRoute != 0 { batchSize += 1 } start := 0 end := 0 wg := sync.WaitGroup{} for end < numInsertData { - if end + batchSize >= numInsertData { + if end+batchSize >= numInsertData { end = numInsertData } else { end = end + batchSize @@ -154,3 +175,54 @@ func (wn *WriteNode) DoWriteNode(ctx context.Context) { wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg) wn.UpdateTimeSync(wn.MessageClient.TimeSync()) } + +func (wn *WriteNode) WriterLog(length int) { + wn.MsgCounter.InsertCounter += int64(length) + timeNow := time.Now() + duration := timeNow.Sub(wn.MsgCounter.InsertTime) + speed := float64(length) / duration.Seconds() + + insertLog := InsertLog{ + MsgLength: length, + DurationInMilliseconds: duration.Milliseconds(), + InsertTime: timeNow, + NumSince: wn.MsgCounter.InsertCounter, + Speed: speed, + } + + wn.InsertLogs = append(wn.InsertLogs, insertLog) + wn.MsgCounter.InsertTime = timeNow +} + +func (wn *WriteNode) WriteWriterLog() { + f, err := os.OpenFile("/tmp/write_node_insert.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } + + // write logs + for _, insertLog := range wn.InsertLogs { + insertLogJson, err := json.Marshal(&insertLog) + if err != nil { + log.Fatal(err) + } + + writeString := string(insertLogJson) + "\n" + fmt.Println(writeString) + + _, err2 := f.WriteString(writeString) + if err2 != nil { + log.Fatal(err2) + } + } + + // reset InsertLogs buffer + wn.InsertLogs = make([]InsertLog, 0) + + err = f.Close() + if err != nil { + log.Fatal(err) + } + + fmt.Println("write log done") +}