diff --git a/conf/config.yaml b/conf/config.yaml index ad018ca847..f345b0903d 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -14,7 +14,7 @@ master: port: 53100 pulsarmoniterinterval: 1 pulsartopic: "monitor-topic" - segmentthreshole: 104857600 + segmentthreshole: 1073741824 proxyidlist: [0] querynodenum: 1 writenodenum: 1 diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index f325a0f68e..20eca81a7b 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -95,7 +95,7 @@ func (node *QueryNode) processCollectionCreate(id string, value string) { println("error of json 2 collection") println(err.Error()) } - printCollectionStruct(collection) + //printCollectionStruct(collection) newCollection := node.NewCollection(collection.ID, collection.Name, collection.GrpcMarshalString) for _, partitionTag := range collection.PartitionTags { newCollection.NewPartition(partitionTag) @@ -109,7 +109,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { println("error of json 2 segment") println(err.Error()) } - printSegmentStruct(segment) + //printSegmentStruct(segment) if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { return diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index d766d0cd38..be235bd1d1 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -17,6 +17,9 @@ import ( "encoding/json" "fmt" "github.com/czs007/suvlim/conf" + msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/reader/message_client" "github.com/stretchr/testify/assert" "log" "sort" @@ -24,9 +27,6 @@ import ( "sync/atomic" "time" - msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/reader/message_client" //"github.com/stretchr/testify/assert" ) @@ -69,8 +69,21 @@ type QueryInfo struct { type MsgCounter struct { InsertCounter int64 + InsertTime time.Time + DeleteCounter int64 + DeleteTime time.Time + SearchCounter int64 + SearchTime time.Time +} + +type InsertLog struct { + MsgLength int + DurationInMilliseconds int64 + InsertTime time.Time + NumSince int64 + Speed float64 } type QueryNode struct { @@ -86,6 +99,7 @@ type QueryNode struct { insertData InsertData kvBase *kv.EtcdKVBase msgCounter *MsgCounter + InsertLogs []InsertLog } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { @@ -95,7 +109,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - ServiceTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -135,7 +149,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - ServiceTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -162,6 +176,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes queryNodeTimeSync: queryNodeTimeSync, buffer: buffer, msgCounter: &msgCounter, + InsertLogs: make([]InsertLog, 0), } } @@ -246,13 +261,11 @@ func (node *QueryNode) InitQueryNodeCollection() { func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { const Debug = true - const CountMsgNum = 1000 * 1000 + const CountInsertMsgBaseline = 1000 * 1000 + var BaselineCounter int64 = 0 + node.msgCounter.InsertTime = time.Now() if Debug { - var printFlag = true - var startTime = true - var start time.Time - for { var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} @@ -264,10 +277,9 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { continue } - if startTime { - fmt.Println("============> Start Test <============") - startTime = false - start = time.Now() + if node.msgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter { + node.WriteQueryLog() + BaselineCounter++ } node.QueryNodeDataInit() @@ -279,13 +291,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { node.DoInsertAndDelete() //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - - // Test insert time - if printFlag && node.msgCounter.InsertCounter >= CountMsgNum { - printFlag = false - timeSince := time.Since(start) - fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============") - } } } @@ -334,14 +339,14 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") //for { - //if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { - var status = node.Search(node.messageClient.SearchMsg) - if status.ErrorCode != 0 { - fmt.Println("Search Failed") - node.PublishFailedSearchResult() - } - //break - //} + //if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { + var status = node.Search(node.messageClient.SearchMsg) + if status.ErrorCode != 0 { + fmt.Println("Search Failed") + node.PublishFailedSearchResult() + } + //break + //} //} default: } @@ -485,9 +490,9 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { func (node *QueryNode) DoInsertAndDelete() msgPb.Status { var wg sync.WaitGroup // Do insert - for segmentID, records := range node.insertData.insertRecords { + for segmentID := range node.insertData.insertRecords { wg.Add(1) - go node.DoInsert(segmentID, &records, &wg) + go node.DoInsert(segmentID, &wg) } // Do delete @@ -505,7 +510,7 @@ func (node *QueryNode) DoInsertAndDelete() msgPb.Status { return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS} } -func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status { +func (node *QueryNode) DoInsert(segmentID int64, wg *sync.WaitGroup) msgPb.Status { fmt.Println("Doing insert..., len = ", len(node.insertData.insertIDs[segmentID])) var targetSegment, err = node.GetSegmentBySegmentID(segmentID) if err != nil { @@ -515,10 +520,12 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai ids := node.insertData.insertIDs[segmentID] timestamps := node.insertData.insertTimestamps[segmentID] + records := node.insertData.insertRecords[segmentID] offsets := node.insertData.insertOffset[segmentID] - node.msgCounter.InsertCounter += int64(len(ids)) - err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, records) + node.QueryLog(len(ids)) + + err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, &records) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} @@ -557,7 +564,7 @@ func (node *QueryNode) QueryJson2Info(queryJson *string) *QueryInfo { return nil } - fmt.Println(query) + //fmt.Println(query) return &query } @@ -585,7 +592,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds. // Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`. var logicTimestamp = searchTimestamp << 46 >> 46 - searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval + 600)) << 18 + logicTimestamp + searchTimestamp = (searchTimestamp>>18-uint64(conf.Config.Timesync.Interval+600))<<18 + logicTimestamp var vector = msg.Records // We now only the first Json is valid. @@ -594,7 +601,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 1. Timestamp check // TODO: return or wait? Or adding graceful time if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync { - fmt.Println("Invalid query time, timestamp = ", searchTimestamp >> 18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync >> 18) + fmt.Println("Invalid query time, timestamp = ", searchTimestamp>>18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync>>18) return msgPb.Status{ErrorCode: 1} } @@ -608,7 +615,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { continue } - fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) + //fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) var res, err = segment.SegmentSearch(query, searchTimestamp, vector) if err != nil { fmt.Println(err.Error()) diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index 3da047a4e8..21be00f350 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/czs007/suvlim/errors" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/stretchr/testify/assert" "strconv" "unsafe" ) @@ -143,11 +144,13 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] var numOfRow = len(*entityIDs) var sizeofPerRow = len((*records)[0]) - var rawData = make([]byte, numOfRow*sizeofPerRow) + assert.Equal(nil, numOfRow, len(*records)) + + var rawData = make([]byte, numOfRow * sizeofPerRow) var copyOffset = 0 for i := 0; i < len(*records); i++ { copy(rawData[copyOffset:], (*records)[i]) - copyOffset += len((*records)[i]) + copyOffset += sizeofPerRow } var cOffset = C.long(offset) @@ -239,7 +242,7 @@ func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) } - fmt.Println("Search Result---- Ids =", resultIds, ", Distances =", resultDistances) + //fmt.Println("Search Result---- Ids =", resultIds, ", Distances =", resultDistances) return &SearchResult{ResultIds: resultIds, ResultDistances: resultDistances}, nil } diff --git a/reader/read_node/util_functions.go b/reader/read_node/util_functions.go index c9071d4e04..303d824d90 100644 --- a/reader/read_node/util_functions.go +++ b/reader/read_node/util_functions.go @@ -1,8 +1,13 @@ package reader import ( + "encoding/json" "errors" + "fmt" + log "github.com/apache/pulsar/pulsar-client-go/logutil" + "os" "strconv" + "time" ) // Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs @@ -68,3 +73,54 @@ func (c *Collection) GetPartitionByName(partitionName string) (partition *Partit return nil // TODO: remove from c.Partitions } + +func (node *QueryNode) QueryLog(length int) { + node.msgCounter.InsertCounter += int64(length) + timeNow := time.Now() + duration := timeNow.Sub(node.msgCounter.InsertTime) + speed := float64(length) / duration.Seconds() + + insertLog := InsertLog{ + MsgLength: length, + DurationInMilliseconds: duration.Milliseconds(), + InsertTime: timeNow, + NumSince: node.msgCounter.InsertCounter, + Speed: speed, + } + + node.InsertLogs = append(node.InsertLogs, insertLog) + node.msgCounter.InsertTime = timeNow +} + +func (node *QueryNode) WriteQueryLog() { + f, err := os.OpenFile("/tmp/query_node.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } + + // write logs + for _, insertLog := range node.InsertLogs { + insertLogJson, err := json.Marshal(&insertLog) + if err != nil { + log.Fatal(err) + } + + writeString := string(insertLogJson) + "\n" + fmt.Println(writeString) + + _, err2 := f.WriteString(writeString) + if err2 != nil { + log.Fatal(err2) + } + } + + // reset InsertLogs buffer + node.InsertLogs = make([]InsertLog, 0) + + err = f.Close() + if err != nil { + log.Fatal(err) + } + + fmt.Println("write log done") +} diff --git a/sdk/examples/common/TestParameter.h b/sdk/examples/common/TestParameter.h index 6e8c1725ed..aceafee0a3 100644 --- a/sdk/examples/common/TestParameter.h +++ b/sdk/examples/common/TestParameter.h @@ -27,19 +27,14 @@ struct TestParameters { // collection parameters, only works when collection_name_ is empty int64_t index_type_ = (int64_t)milvus::IndexType::IVFSQ8; // sq8 - int64_t index_file_size_ = 1024; // 1024 MB int64_t nlist_ = 16384; int64_t metric_type_ = (int64_t)milvus::MetricType::L2; // L2 - int64_t dimensions_ = 128; - int64_t row_count_ = 1; // 1 million // query parameters - int64_t concurrency_ = 20; // 20 connections int64_t query_count_ = 1000; int64_t nq_ = 1; int64_t topk_ = 10; int64_t nprobe_ = 16; - bool print_result_ = false; bool is_valid = true; }; diff --git a/sdk/examples/simple/count_collection.cpp b/sdk/examples/simple/count_collection.cpp index 3c5a0d3beb..6874e98859 100644 --- a/sdk/examples/simple/count_collection.cpp +++ b/sdk/examples/simple/count_collection.cpp @@ -4,26 +4,98 @@ #include #include "utils/Utils.h" +const int DIM = 128; + +bool check_field(milvus::FieldPtr left, milvus::FieldPtr right){ + + if (left->field_name != right->field_name){ + std::cout<<"filed_name not match! want "<< left->field_name << " but get "<field_name << std::endl; + return false; + } + + if (left->field_type != right->field_type){ + std::cout<<"filed_type not match! want "<< int(left->field_type) << " but get "<< int(right->field_type) << std::endl; + return false; + } + + + if (left->dim != right->dim){ + std::cout<<"dim not match! want "<< left->dim << " but get "<dim << std::endl; + return false; + } + + return true; +} + + +bool check_schema(const milvus::Mapping & map){ + // Get Collection info + bool ret = false; + + milvus::FieldPtr field_ptr1 = std::make_shared(); + milvus::FieldPtr field_ptr2 = std::make_shared(); + + field_ptr1->field_name = "age"; + field_ptr1->field_type = milvus::DataType::INT32; + field_ptr1->dim = 1; + + field_ptr2->field_name = "field_vec"; + field_ptr2->field_type = milvus::DataType::VECTOR_FLOAT; + field_ptr2->dim = DIM; + + std::vector fields{field_ptr1, field_ptr2}; + + auto size_ = map.fields.size(); + for ( int i =0; i != size_; ++ i){ + auto ret = check_field(fields[i], map.fields[i]); + if (!ret){ + return false; + } + } + + for (auto &f : map.fields) { + std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl; + } + + return true; +} + + int main(int argc , char**argv) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); if (!parameters.is_valid) { return 0; } + + if (parameters.collection_name_.empty()){ + std::cout<< "should specify collection name!" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + const std::string collection_name = parameters.collection_name_; auto client = milvus::ConnectionImpl(); milvus::ConnectParam connect_param; connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1" : parameters.address_; connect_param.port = parameters.port_.empty() ? "19530" : parameters.port_; client.Connect(connect_param); - milvus::Status stat; - const std::string collectin_name = "collection1"; + milvus::Mapping map; + client.GetCollectionInfo(collection_name, map); + auto check_ret = check_schema(map); + if (!check_ret){ + std::cout<<" Schema is not right!"<< std::endl; + return 0; + } + + milvus::Status stat; int64_t count = 0; - stat = client.CountEntities(collectin_name, count); + stat = client.CountEntities(collection_name, count); if (!stat.ok()){ std::cerr << "Error: " << stat.message() << std::endl; } - std::cout << "Collection " << collectin_name << " rows: " << count << std::endl; - -} \ No newline at end of file + std::cout << "Collection " < #include "interface/ConnectionImpl.h" #include "utils/Utils.h" + +int ID_START = 0; + +void generate_ids(std::vector & ids_array, int count); + +void generate_ids(std::vector& ids_array, int count) { + for (int i = 0; i < count; i++) { + ids_array.push_back(ID_START++); + } +} + int main(int argc, char *argv[]) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); if (!parameters.is_valid){ return 0; } + + if (parameters.collection_name_.empty()){ + std::cout<< "should specify collection name!" << std::endl; + milvus_sdk::Utils::PrintHelp(argc, argv); + return 0; + } + + const std::string collection_name = parameters.collection_name_; auto client = milvus::ConnectionImpl(); milvus::ConnectParam connect_param; connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_; @@ -29,10 +48,8 @@ main(int argc, char *argv[]) { client.Connect(connect_param); std::vector delete_ids; - delete_ids.push_back(1); - delete_ids.push_back(2); - delete_ids.push_back(3); - client.DeleteEntityByID("collection1", delete_ids); + generate_ids(delete_ids, 3); + client.DeleteEntityByID(collection_name, delete_ids); return 0; } diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 9b0fe128ec..4c4a0df701 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -21,18 +21,18 @@ #include "utils/TimeRecorder.h" #include -const int N = 200000; +const int N = 6000000; const int DIM = 128; -const int LOOP = 100; +const int LOOP = 2000; int ID_START = 0; +std::default_random_engine eng(42); -int generate_ids(std::vector & ids_array, int count); +void generate_ids(std::vector & ids_array, int count); -int generate_ids(std::vector& ids_array, int count) { +void generate_ids(std::vector& ids_array, int count) { for (int i = 0; i < count; i++) { ids_array.push_back(ID_START++); } - return 0; } const milvus::FieldValue GetData(int count) { @@ -44,7 +44,6 @@ const milvus::FieldValue GetData(int count) { int32_data.push_back(ID_START++); } - std::default_random_engine eng(42); std::normal_distribution dis(0, 1); std::vector vector_data; for (int i = 0; i < count; i++) { diff --git a/sdk/examples/simple/search.cpp b/sdk/examples/simple/search.cpp index 25ff340a5c..362a9fa521 100644 --- a/sdk/examples/simple/search.cpp +++ b/sdk/examples/simple/search.cpp @@ -20,7 +20,7 @@ const int TOP_K = 10; const int LOOP = 1000; const int DIM = 128; - +std::default_random_engine eng(42); const milvus::VectorParam get_vector_param() { @@ -28,7 +28,6 @@ get_vector_param() { milvus::VectorParam vectorParam; std::vector vector_records; - std::default_random_engine eng(42); std::normal_distribution dis(0, 1); for (int j = 0; j < 1; ++j) { diff --git a/sdk/utils/Utils.cpp b/sdk/utils/Utils.cpp index 88e50da3ae..bc36fa83d8 100644 --- a/sdk/utils/Utils.cpp +++ b/sdk/utils/Utils.cpp @@ -40,14 +40,10 @@ print_help(const std::string& app_name) { printf(" -h --help Print help information\n"); printf(" -i --index " "Collection index type(1=IDMAP, 2=IVFLAT, 3=IVFSQ8, 5=IVFSQ8H), default:3\n"); - printf(" -f --index_file_size Collection index file size, default:1024\n"); printf(" -l --nlist Collection index nlist, default:16384\n"); printf(" -m --metric " "Collection metric type(1=L2, 2=IP, 3=HAMMING, 4=JACCARD, 5=TANIMOTO, 6=SUBSTRUCTURE, 7=SUPERSTRUCTURE), " "default:1\n"); - printf(" -d --dimension Collection dimension, default:128\n"); - printf(" -r --rowcount Collection total row count(unit:million), default:1\n"); - printf(" -c --concurrency Max client connections, default:20\n"); printf(" -q --query_count Query total count, default:1000\n"); printf(" -n --nq nq of each query, default:1\n"); printf(" -k --topk topk of each query, default:10\n"); @@ -487,17 +483,12 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's {"help", no_argument, nullptr, 'h'}, {"collection_name", no_argument, nullptr, 't'}, {"index", optional_argument, nullptr, 'i'}, - {"index_file_size", optional_argument, nullptr, 'f'}, {"nlist", optional_argument, nullptr, 'l'}, {"metric", optional_argument, nullptr, 'm'}, - {"dimension", optional_argument, nullptr, 'd'}, - {"rowcount", optional_argument, nullptr, 'r'}, - {"concurrency", optional_argument, nullptr, 'c'}, {"query_count", optional_argument, nullptr, 'q'}, {"nq", optional_argument, nullptr, 'n'}, {"topk", optional_argument, nullptr, 'k'}, {"nprobe", optional_argument, nullptr, 'b'}, - {"print", optional_argument, nullptr, 'v'}, {nullptr, 0, nullptr, 0}}; int option_index = 0; @@ -531,12 +522,6 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's free(ptr); break; } - case 'f': { - char* ptr = strdup(optarg); - parameters.index_file_size_ = atol(ptr); - free(ptr); - break; - } case 'l': { char* ptr = strdup(optarg); parameters.nlist_ = atol(ptr); @@ -549,24 +534,6 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's free(ptr); break; } - case 'd': { - char* ptr = strdup(optarg); - parameters.dimensions_ = atol(ptr); - free(ptr); - break; - } - case 'r': { - char* ptr = strdup(optarg); - parameters.row_count_ = atol(ptr); - free(ptr); - break; - } - case 'c': { - char* ptr = strdup(optarg); - parameters.concurrency_ = atol(ptr); - free(ptr); - break; - } case 'q': { char* ptr = strdup(optarg); parameters.query_count_ = atol(ptr); @@ -591,10 +558,6 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's free(ptr); break; } - case 'v': { - parameters.print_result_ = true; - break; - } case 'h': default: print_help(app_name);