diff --git a/conf/config.yaml b/conf/config.yaml index 3b79882932..b7153fa04d 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -28,7 +28,7 @@ timesync: storage: driver: TIKV address: localhost - port: 2379 + port: 0 accesskey: ab secretkey: dd @@ -41,7 +41,7 @@ pulsar: reader: clientid: 1 stopflag: -1 - readerqueuesize: 10240 + readerqueuesize: 1024 searchchansize: 10000 key2segchansize: 10000 inserttopicstart: 0 diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index 61d6ed5e02..64ef132904 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -615,9 +615,9 @@ SegmentNaive::GetMemoryUsageInBytes() { total_bytes += vec_ptr->IndexSize(); } } - int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); - int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1); total_bytes += del_n * (16 * 2); return total_bytes; } diff --git a/core/unittest/test_c_api.cpp b/core/unittest/test_c_api.cpp index 60419237af..d2618ac2f6 100644 --- a/core/unittest/test_c_api.cpp +++ b/core/unittest/test_c_api.cpp @@ -289,9 +289,6 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { auto partition = NewPartition(collection, partition_name); auto segment = NewSegment(partition, 0); - auto old_memory_usage_size = GetMemoryUsageInBytes(segment); - std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl; - std::vector raw_data; std::vector timestamps; std::vector uids; @@ -320,8 +317,6 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { auto memory_usage_size = GetMemoryUsageInBytes(segment); - std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl; - assert(memory_usage_size == 1898459); DeleteCollection(collection); diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index df07ef6f01..d2cb29d825 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -178,6 +178,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topic_num; try { mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); + printf("%ld \n", mut_msg.segment_id()); mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index bda1a9638d..c8e4be19dd 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -104,7 +104,6 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { } printSegmentStruct(segment) - // TODO: fix this after channel range config finished //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { // return //} @@ -118,6 +117,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { newSegment := partition.NewSegment(newSegmentID) newSegment.SegmentStatus = SegmentOpened newSegment.SegmentCloseTime = segment.CloseTimeStamp + partition.OpenedSegments = append(partition.OpenedSegments, newSegment) node.SegmentsMap[newSegmentID] = newSegment } } @@ -147,7 +147,6 @@ func (node *QueryNode) processSegmentModify(id string, value string) { } printSegmentStruct(segment) - // TODO: fix this after channel range config finished //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { // return //} diff --git a/reader/read_node/partition.go b/reader/read_node/partition.go index d308593548..01006edd50 100644 --- a/reader/read_node/partition.go +++ b/reader/read_node/partition.go @@ -16,7 +16,8 @@ import "C" type Partition struct { PartitionPtr C.CPartition PartitionName string - Segments []*Segment + OpenedSegments []*Segment + ClosedSegments []*Segment } func (p *Partition) NewSegment(segmentId int64) *Segment { @@ -27,7 +28,7 @@ func (p *Partition) NewSegment(segmentId int64) *Segment { segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId)) var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId} - p.Segments = append(p.Segments, newSegment) + p.OpenedSegments = append(p.OpenedSegments, newSegment) return newSegment } diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index 0c163ec55e..dbd75ca0e4 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -248,11 +248,17 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { const CountMsgNum = 1000 * 1000 if Debug { + var start = time.Now() var printFlag = true - var startTime = true - var start time.Time for { + // Test insert time + if printFlag && node.msgCounter.InsertCounter >= CountMsgNum { + printFlag = false + timeSince := time.Since(start) + fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============") + } + var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} assert.NotEqual(nil, 0, timeRange.timestampMin) @@ -262,12 +268,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { continue } - if startTime { - fmt.Println("============> Start Test <============") - startTime = false - start = time.Now() - } - node.QueryNodeDataInit() node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) //fmt.Println("MessagesPreprocess Done") @@ -277,13 +277,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { node.DoInsertAndDelete() //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - - // Test insert time - if printFlag && node.msgCounter.InsertCounter >= CountMsgNum { - printFlag = false - timeSince := time.Since(start) - fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============") - } } } @@ -589,7 +582,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - + fmt.Println(res.ResultIds) for i := 0; i < len(res.ResultIds); i++ { resultsTmp = append(resultsTmp, SearchResultTmp{ResultId: res.ResultIds[i], ResultDistance: res.ResultDistances[i]}) } diff --git a/reader/read_node/reader.go b/reader/read_node/reader.go index f9c9927dd5..2b0d596d63 100644 --- a/reader/read_node/reader.go +++ b/reader/read_node/reader.go @@ -3,7 +3,6 @@ package reader import ( "context" "github.com/czs007/suvlim/reader/message_client" - "log" "sync" ) @@ -16,17 +15,11 @@ func StartQueryNode(pulsarURL string) { ctx := context.Background() // Segments Services - go qn.SegmentManagementService() + //go qn.SegmentManagementService() go qn.SegmentStatisticService() wg := sync.WaitGroup{} - err := qn.InitFromMeta() - - if err != nil { - log.Printf("Init query node from meta failed") - return - } - + qn.InitFromMeta() wg.Add(3) go qn.RunMetaService(ctx, &wg) go qn.RunInsertDelete(&wg) diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index 1dac55875c..8c452686b3 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -73,8 +73,6 @@ func (s *Segment) CloseSegment(collection* Collection) error { int Close(CSegmentBase c_segment); */ - fmt.Println("Closing segment :", s.SegmentId) - var status = C.Close(s.SegmentPtr) s.SegmentStatus = SegmentClosed @@ -84,13 +82,11 @@ func (s *Segment) CloseSegment(collection* Collection) error { // Build index after closing segment s.SegmentStatus = SegmentIndexing - fmt.Println("Building index...") s.buildIndex(collection) // TODO: remove redundant segment indexed status // Change segment status to indexed s.SegmentStatus = SegmentIndexed - fmt.Println("Segment closed and indexed") return nil } diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index ea821a04fb..37bf33665c 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -13,19 +13,20 @@ func (node *QueryNode) SegmentsManagement() { //node.queryNodeTimeSync.UpdateTSOTimeSync() //var timeNow = node.queryNodeTimeSync.TSOTimeSync - timeNow := node.messageClient.GetTimeNow() >> 18 + timeNow := node.messageClient.GetTimeNow() for _, collection := range node.Collections { for _, partition := range collection.Partitions { - for _, segment := range partition.Segments { - if segment.SegmentStatus != SegmentOpened { - log.Println("Segment have been closed") - continue - } - - fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime) - if timeNow >= segment.SegmentCloseTime { - go segment.CloseSegment(collection) + for _, oldSegment := range partition.OpenedSegments { + // TODO: check segment status + if timeNow >= oldSegment.SegmentCloseTime { + // close old segment and move it into partition.ClosedSegments + if oldSegment.SegmentStatus != SegmentOpened { + log.Println("Never reach here, Opened segment cannot be closed") + continue + } + go oldSegment.CloseSegment(collection) + partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) } } } @@ -33,7 +34,7 @@ func (node *QueryNode) SegmentsManagement() { } func (node *QueryNode) SegmentManagementService() { - sleepMillisecondTime := 3000 + sleepMillisecondTime := 1000 fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") for { time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) @@ -80,8 +81,6 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { statisticData = append(statisticData, stat) } - fmt.Println("Publish segment statistic") - fmt.Println(statisticData) var status = node.PublicStatistic(&statisticData) if status.ErrorCode != msgPb.ErrorCode_SUCCESS { log.Printf("Publish segments statistic failed") @@ -89,7 +88,7 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { } func (node *QueryNode) SegmentStatisticService() { - sleepMillisecondTime := 3000 + sleepMillisecondTime := 1000 fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 5971b4c27c..1fe54e449d 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -21,7 +21,7 @@ #include "interface/ConnectionImpl.h" #include "utils/TimeRecorder.h" -const int N = 200000; +const int N = 100; const int DIM = 16; const int LOOP = 10; diff --git a/storage/internal/tikv/tikv_store.go b/storage/internal/tikv/tikv_store.go index 84df3eecc8..902b63e84c 100644 --- a/storage/internal/tikv/tikv_store.go +++ b/storage/internal/tikv/tikv_store.go @@ -3,7 +3,6 @@ package tikv_driver import ( "context" "errors" - "github.com/czs007/suvlim/conf" . "github.com/czs007/suvlim/storage/internal/tikv/codec" . "github.com/czs007/suvlim/storage/pkg/types" "github.com/tikv/client-go/config" @@ -87,8 +86,7 @@ type TikvStore struct { } func NewTikvStore(ctx context.Context) (*TikvStore, error) { - var pdAddress0 = conf.Config.Storage.Address + ":" + strconv.FormatInt(int64(conf.Config.Storage.Port), 10) - pdAddrs := []string{pdAddress0} + pdAddrs := []string{"127.0.0.1:2379"} conf := config.Default() client, err := rawkv.NewClient(ctx, pdAddrs, conf) if err != nil { diff --git a/writer/main.go b/writer/main.go index f862775c8d..0074fae5de 100644 --- a/writer/main.go +++ b/writer/main.go @@ -32,48 +32,35 @@ func main() { log.Fatal(err) } - msgCounter := write_node.MsgCounter{ - InsertCounter: 0, - DeleteCounter: 0, - } - wn := write_node.WriteNode{ KvStore: &kv, MessageClient: &mc, TimeSync: 100, - MsgCounter: &msgCounter, } const Debug = true const CountMsgNum = 1000 * 1000 if Debug { + var start = time.Now() var printFlag = true - var startTime = true - var start time.Time for { - if ctx.Err() != nil { - break - } - msgLength := wn.MessageClient.PrepareBatchMsg() - if msgLength > 0 { - if startTime { - fmt.Println("============> Start Test <============") - startTime = false - start = time.Now() - } - - wn.DoWriteNode(ctx, &wg) - fmt.Println("write node do a batch message, storage len: ", msgLength) - } - // Test insert time if printFlag && wn.MsgCounter.InsertCounter >= CountMsgNum { printFlag = false timeSince := time.Since(start) fmt.Println("============> Do", wn.MsgCounter.InsertCounter, "Insert in", timeSince, "<============") } + + if ctx.Err() != nil { + break + } + msgLength := wn.MessageClient.PrepareBatchMsg() + if msgLength > 0 { + wn.DoWriteNode(ctx, &wg) + fmt.Println("write node do a batch message, storage len: ", msgLength) + } } }