diff --git a/conf/config.yaml b/conf/config.yaml index b7153fa04d..3b79882932 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -28,7 +28,7 @@ timesync: storage: driver: TIKV address: localhost - port: 0 + port: 2379 accesskey: ab secretkey: dd @@ -41,7 +41,7 @@ pulsar: reader: clientid: 1 stopflag: -1 - readerqueuesize: 1024 + readerqueuesize: 10240 searchchansize: 10000 key2segchansize: 10000 inserttopicstart: 0 diff --git a/core/unittest/test_c_api.cpp b/core/unittest/test_c_api.cpp index d2618ac2f6..60419237af 100644 --- a/core/unittest/test_c_api.cpp +++ b/core/unittest/test_c_api.cpp @@ -289,6 +289,9 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { auto partition = NewPartition(collection, partition_name); auto segment = NewSegment(partition, 0); + auto old_memory_usage_size = GetMemoryUsageInBytes(segment); + std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl; + std::vector raw_data; std::vector timestamps; std::vector uids; @@ -317,6 +320,8 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { auto memory_usage_size = GetMemoryUsageInBytes(segment); + std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl; + assert(memory_usage_size == 1898459); DeleteCollection(collection); diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index d2cb29d825..df07ef6f01 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -178,7 +178,6 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topic_num; try { mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); - printf("%ld \n", mut_msg.segment_id()); mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index c8e4be19dd..bda1a9638d 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -104,6 +104,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { } printSegmentStruct(segment) + // TODO: fix this after channel range config finished //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { // return //} @@ -117,7 +118,6 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { newSegment := partition.NewSegment(newSegmentID) newSegment.SegmentStatus = SegmentOpened newSegment.SegmentCloseTime = segment.CloseTimeStamp - partition.OpenedSegments = append(partition.OpenedSegments, newSegment) node.SegmentsMap[newSegmentID] = newSegment } } @@ -147,6 +147,7 @@ func (node *QueryNode) processSegmentModify(id string, value string) { } printSegmentStruct(segment) + // TODO: fix this after channel range config finished //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { // return //} diff --git a/reader/read_node/partition.go b/reader/read_node/partition.go index 01006edd50..d308593548 100644 --- a/reader/read_node/partition.go +++ b/reader/read_node/partition.go @@ -16,8 +16,7 @@ import "C" type Partition struct { PartitionPtr C.CPartition PartitionName string - OpenedSegments []*Segment - ClosedSegments []*Segment + Segments []*Segment } func (p *Partition) NewSegment(segmentId int64) *Segment { @@ -28,7 +27,7 @@ func (p *Partition) NewSegment(segmentId int64) *Segment { segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId)) var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId} - p.OpenedSegments = append(p.OpenedSegments, newSegment) + p.Segments = append(p.Segments, newSegment) return newSegment } diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index 7a9cfe7e25..0c163ec55e 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -21,6 +21,7 @@ import ( "sort" "sync" "sync/atomic" + "time" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" "github.com/czs007/suvlim/pkg/master/kv" @@ -65,6 +66,12 @@ type QueryInfo struct { FieldName string `json:"field_name"` } +type MsgCounter struct { + InsertCounter int64 + DeleteCounter int64 + SearchCounter int64 +} + type QueryNode struct { QueryNodeId uint64 Collections []*Collection @@ -77,6 +84,7 @@ type QueryNode struct { deleteData DeleteData insertData InsertData kvBase *kv.EtcdKVBase + msgCounter *MsgCounter } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { @@ -99,6 +107,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { validSearchBuffer: make([]bool, 0), } + msgCounter := MsgCounter{ + InsertCounter: 0, + DeleteCounter: 0, + SearchCounter: 0, + } + return &QueryNode{ QueryNodeId: queryNodeId, Collections: nil, @@ -106,6 +120,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { messageClient: &mc, queryNodeTimeSync: queryNodeTimeSync, buffer: buffer, + msgCounter: &msgCounter, } } @@ -132,6 +147,12 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes validSearchBuffer: make([]bool, 0), } + msgCounter := MsgCounter{ + InsertCounter: 0, + DeleteCounter: 0, + SearchCounter: 0, + } + return &QueryNode{ QueryNodeId: queryNodeId, Collections: nil, @@ -139,6 +160,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes messageClient: mc, queryNodeTimeSync: queryNodeTimeSync, buffer: buffer, + msgCounter: &msgCounter, } } @@ -168,9 +190,9 @@ func (node *QueryNode) QueryNodeDataInit() { func (node *QueryNode) NewCollection(collectionID uint64, collectionName string, schemaConfig string) *Collection { /* - void - UpdateIndexes(CCollection c_collection, const char *index_string); - */ + void + UpdateIndexes(CCollection c_collection, const char *index_string); + */ cName := C.CString(collectionName) cSchema := C.CString(schemaConfig) collection := C.NewCollection(cName, cSchema) @@ -183,9 +205,9 @@ func (node *QueryNode) NewCollection(collectionID uint64, collectionName string, func (node *QueryNode) DeleteCollection(collection *Collection) { /* - void - DeleteCollection(CCollection collection); - */ + void + DeleteCollection(CCollection collection); + */ cPtr := collection.CollectionPtr C.DeleteCollection(cPtr) @@ -194,8 +216,8 @@ func (node *QueryNode) DeleteCollection(collection *Collection) { func (node *QueryNode) UpdateIndexes(collection *Collection, indexConfig *string) { /* - void - UpdateIndexes(CCollection c_collection, const char *index_string); + void + UpdateIndexes(CCollection c_collection, const char *index_string); */ cCollectionPtr := collection.CollectionPtr cIndexConfig := C.CString(*indexConfig) @@ -222,8 +244,50 @@ func (node *QueryNode) InitQueryNodeCollection() { //////////////////////////////////////////////////////////////////////////////////////////////////// func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { + const Debug = true + const CountMsgNum = 1000 * 1000 + + if Debug { + var printFlag = true + var startTime = true + var start time.Time + + for { + var msgLen = node.PrepareBatchMsg() + var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} + assert.NotEqual(nil, 0, timeRange.timestampMin) + assert.NotEqual(nil, 0, timeRange.timestampMax) + + if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { + continue + } + + if startTime { + fmt.Println("============> Start Test <============") + startTime = false + start = time.Now() + } + + node.QueryNodeDataInit() + node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) + //fmt.Println("MessagesPreprocess Done") + node.WriterDelete() + node.PreInsertAndDelete() + //fmt.Println("PreInsertAndDelete Done") + node.DoInsertAndDelete() + //fmt.Println("DoInsertAndDelete Done") + node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + + // Test insert time + if printFlag && node.msgCounter.InsertCounter >= CountMsgNum { + printFlag = false + timeSince := time.Since(start) + fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============") + } + } + } + for { - // TODO: get timeRange from message client var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} assert.NotEqual(nil, 0, timeRange.timestampMin) @@ -444,6 +508,7 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai timestamps := node.insertData.insertTimestamps[segmentID] offsets := node.insertData.insertOffset[segmentID] + node.msgCounter.InsertCounter += int64(len(ids)) err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, records) if err != nil { fmt.Println(err.Error()) @@ -463,6 +528,7 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes offset := node.deleteData.deleteOffset[segmentID] + node.msgCounter.DeleteCounter += int64(len(*deleteIDs)) err = segment.SegmentDelete(offset, deleteIDs, deleteTimestamps) if err != nil { fmt.Println(err.Error()) @@ -487,17 +553,18 @@ func (node *QueryNode) QueryJson2Info(queryJson *string) *QueryInfo { } func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { - // TODO: use client id to publish results to different clients - // var clientId = (*(searchMessages[0])).ClientId type SearchResultTmp struct { ResultId int64 ResultDistance float32 } + node.msgCounter.SearchCounter += int64(len(searchMessages)) + // Traverse all messages in the current messageClient. // TODO: Do not receive batched search requests for _, msg := range searchMessages { + var clientId = msg.ClientId var resultsTmp = make([]SearchResultTmp, 0) var timestamp = msg.Timestamp @@ -522,7 +589,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - fmt.Println(res.ResultIds) + for i := 0; i < len(res.ResultIds); i++ { resultsTmp = append(resultsTmp, SearchResultTmp{ResultId: res.ResultIds[i], ResultDistance: res.ResultDistances[i]}) } @@ -543,6 +610,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { Entities: &entities, Distances: make([]float32, 0), QueryId: msg.Uid, + ClientId: clientId, } for _, res := range resultsTmp { results.Entities.Ids = append(results.Entities.Ids, res.ResultId) diff --git a/reader/read_node/reader.go b/reader/read_node/reader.go index 2b0d596d63..f9c9927dd5 100644 --- a/reader/read_node/reader.go +++ b/reader/read_node/reader.go @@ -3,6 +3,7 @@ package reader import ( "context" "github.com/czs007/suvlim/reader/message_client" + "log" "sync" ) @@ -15,11 +16,17 @@ func StartQueryNode(pulsarURL string) { ctx := context.Background() // Segments Services - //go qn.SegmentManagementService() + go qn.SegmentManagementService() go qn.SegmentStatisticService() wg := sync.WaitGroup{} - qn.InitFromMeta() + err := qn.InitFromMeta() + + if err != nil { + log.Printf("Init query node from meta failed") + return + } + wg.Add(3) go qn.RunMetaService(ctx, &wg) go qn.RunInsertDelete(&wg) diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index 8c452686b3..1dac55875c 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -73,6 +73,8 @@ func (s *Segment) CloseSegment(collection* Collection) error { int Close(CSegmentBase c_segment); */ + fmt.Println("Closing segment :", s.SegmentId) + var status = C.Close(s.SegmentPtr) s.SegmentStatus = SegmentClosed @@ -82,11 +84,13 @@ func (s *Segment) CloseSegment(collection* Collection) error { // Build index after closing segment s.SegmentStatus = SegmentIndexing + fmt.Println("Building index...") s.buildIndex(collection) // TODO: remove redundant segment indexed status // Change segment status to indexed s.SegmentStatus = SegmentIndexed + fmt.Println("Segment closed and indexed") return nil } diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index 37bf33665c..ea821a04fb 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -13,20 +13,19 @@ func (node *QueryNode) SegmentsManagement() { //node.queryNodeTimeSync.UpdateTSOTimeSync() //var timeNow = node.queryNodeTimeSync.TSOTimeSync - timeNow := node.messageClient.GetTimeNow() + timeNow := node.messageClient.GetTimeNow() >> 18 for _, collection := range node.Collections { for _, partition := range collection.Partitions { - for _, oldSegment := range partition.OpenedSegments { - // TODO: check segment status - if timeNow >= oldSegment.SegmentCloseTime { - // close old segment and move it into partition.ClosedSegments - if oldSegment.SegmentStatus != SegmentOpened { - log.Println("Never reach here, Opened segment cannot be closed") - continue - } - go oldSegment.CloseSegment(collection) - partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) + for _, segment := range partition.Segments { + if segment.SegmentStatus != SegmentOpened { + log.Println("Segment have been closed") + continue + } + + fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime) + if timeNow >= segment.SegmentCloseTime { + go segment.CloseSegment(collection) } } } @@ -34,7 +33,7 @@ func (node *QueryNode) SegmentsManagement() { } func (node *QueryNode) SegmentManagementService() { - sleepMillisecondTime := 1000 + sleepMillisecondTime := 3000 fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") for { time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) @@ -81,6 +80,8 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { statisticData = append(statisticData, stat) } + fmt.Println("Publish segment statistic") + fmt.Println(statisticData) var status = node.PublicStatistic(&statisticData) if status.ErrorCode != msgPb.ErrorCode_SUCCESS { log.Printf("Publish segments statistic failed") @@ -88,7 +89,7 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { } func (node *QueryNode) SegmentStatisticService() { - sleepMillisecondTime := 1000 + sleepMillisecondTime := 3000 fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 1fe54e449d..5971b4c27c 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -21,7 +21,7 @@ #include "interface/ConnectionImpl.h" #include "utils/TimeRecorder.h" -const int N = 100; +const int N = 200000; const int DIM = 16; const int LOOP = 10; diff --git a/storage/internal/tikv/tikv_store.go b/storage/internal/tikv/tikv_store.go index 902b63e84c..84df3eecc8 100644 --- a/storage/internal/tikv/tikv_store.go +++ b/storage/internal/tikv/tikv_store.go @@ -3,6 +3,7 @@ package tikv_driver import ( "context" "errors" + "github.com/czs007/suvlim/conf" . "github.com/czs007/suvlim/storage/internal/tikv/codec" . "github.com/czs007/suvlim/storage/pkg/types" "github.com/tikv/client-go/config" @@ -86,7 +87,8 @@ type TikvStore struct { } func NewTikvStore(ctx context.Context) (*TikvStore, error) { - pdAddrs := []string{"127.0.0.1:2379"} + var pdAddress0 = conf.Config.Storage.Address + ":" + strconv.FormatInt(int64(conf.Config.Storage.Port), 10) + pdAddrs := []string{pdAddress0} conf := config.Default() client, err := rawkv.NewClient(ctx, pdAddrs, conf) if err != nil { diff --git a/writer/main.go b/writer/main.go index 7a937054a3..f862775c8d 100644 --- a/writer/main.go +++ b/writer/main.go @@ -10,6 +10,7 @@ import ( "log" "strconv" "sync" + "time" ) func main() { @@ -31,10 +32,49 @@ func main() { log.Fatal(err) } + msgCounter := write_node.MsgCounter{ + InsertCounter: 0, + DeleteCounter: 0, + } + wn := write_node.WriteNode{ KvStore: &kv, MessageClient: &mc, TimeSync: 100, + MsgCounter: &msgCounter, + } + + const Debug = true + const CountMsgNum = 1000 * 1000 + + if Debug { + var printFlag = true + var startTime = true + var start time.Time + + for { + if ctx.Err() != nil { + break + } + msgLength := wn.MessageClient.PrepareBatchMsg() + if msgLength > 0 { + if startTime { + fmt.Println("============> Start Test <============") + startTime = false + start = time.Now() + } + + wn.DoWriteNode(ctx, &wg) + fmt.Println("write node do a batch message, storage len: ", msgLength) + } + + // Test insert time + if printFlag && wn.MsgCounter.InsertCounter >= CountMsgNum { + printFlag = false + timeSince := time.Since(start) + fmt.Println("============> Do", wn.MsgCounter.InsertCounter, "Insert in", timeSince, "<============") + } + } } //TODO:: start a gorouter for searchById diff --git a/writer/write_node/writer_node.go b/writer/write_node/writer_node.go index 0ecb9eb3e3..f74e9dbbb1 100644 --- a/writer/write_node/writer_node.go +++ b/writer/write_node/writer_node.go @@ -17,11 +17,16 @@ type SegmentIdInfo struct { SegmentIds []string } +type MsgCounter struct { + InsertCounter int64 + DeleteCounter int64 +} type WriteNode struct { KvStore *types.Store MessageClient *message_client.MessageClient TimeSync uint64 + MsgCounter *MsgCounter } func (wn *WriteNode) Close() { @@ -34,10 +39,17 @@ func NewWriteNode(ctx context.Context, timeSync uint64) (*WriteNode, error) { kv, err := storage.NewStore(context.Background(), types.MinIODriver) mc := message_client.MessageClient{} + + msgCounter := MsgCounter{ + InsertCounter: 0, + DeleteCounter: 0, + } + return &WriteNode{ KvStore: &kv, MessageClient: &mc, TimeSync: timeSync, + MsgCounter: &msgCounter, }, err } @@ -58,6 +70,8 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr timeStamp = append(timeStamp, uint64(data[i].Timestamp)) } + wn.MsgCounter.InsertCounter += int64(len(timeStamp)) + error := (*wn.KvStore).PutRows(ctx, prefixKeys, binaryData, suffixKeys, timeStamp) if error != nil { fmt.Println("Can't insert data!") @@ -89,13 +103,15 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOr } segmentInfo := msgpb.Key2SegMsg{ - Uid: data[i].Uid, + Uid: data[i].Uid, SegmentId: segmentIds, Timestamp: data[i].Timestamp, } wn.MessageClient.Send(ctx, segmentInfo) } + wn.MsgCounter.DeleteCounter += int64(len(timeStamps)) + err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps) if err != nil { fmt.Println("Can't delete data")