diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index c15cdf1a36..0d40103de9 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -9,10 +9,15 @@ using idx_t = int64_t; namespace milvus { namespace dog_segment { +using engine::DataChunk; +using engine::DataChunkPtr; using engine::QueryResult; +using DogDataChunkPtr = std::shared_ptr; + int TestABI(); + class SegmentBase { public: // definitions @@ -25,17 +30,19 @@ class SegmentBase { public: virtual ~SegmentBase() = default; // SegmentBase(std::shared_ptr collection); - // single threaded - virtual Status - Insert(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values, std::pair timestamp_range) = 0; - // TODO: add id into delete log, possibly bitmap - // single threaded + virtual int64_t PreInsert(int64_t size) = 0; + virtual Status - Delete(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, std::pair timestamp_range) = 0; + Insert(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0; + + virtual int64_t PreDelete(int64_t size) = 0; + // TODO: add id into delete log, possibly bitmap + + virtual Status + Delete(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) = 0; // query contains metadata of - // multi-threaded virtual Status Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) = 0; @@ -44,7 +51,6 @@ class SegmentBase { // GetEntityByIds(Timestamp timestamp, const std::vector& ids, DataChunkPtr& results) = 0; // stop receive insert requests - // single threaded virtual Status Close() = 0; @@ -53,15 +59,8 @@ class SegmentBase { // virtual Status // Flush(Timestamp timestamp) = 0; - // BuildIndex With Paramaters, must with Frozen State - // This function is atomic - // NOTE: index_params contains serveral policies for several index - virtual Status - BuildIndex(std::shared_ptr index_params) = 0; - - // Remove Index - virtual Status - DropIndex(std::string_view field_name) = 0; + // watch changes + // NOTE: Segment will use this ptr as correct virtual Status DropRawData(std::string_view field_name) = 0; @@ -69,6 +68,9 @@ class SegmentBase { virtual Status LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) = 0; + virtual Status + BuildIndex() = 0; + public: virtual ssize_t get_row_count() const = 0; @@ -78,12 +80,12 @@ class SegmentBase { virtual ssize_t get_deleted_count() const = 0; - }; using SegmentBasePtr = std::unique_ptr; -SegmentBasePtr CreateSegment(SchemaPtr& ptr); +SegmentBasePtr +CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta); } // namespace engine } // namespace milvus diff --git a/reader/query_node.go b/reader/query_node.go index 0b04bef745..3722067d47 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -14,25 +14,35 @@ package reader import "C" import ( - "errors" "fmt" msgPb "github.com/czs007/suvlim/pkg/message" "github.com/czs007/suvlim/reader/message_client" "sort" - "strconv" "sync" - "time" ) +type InsertData struct { + insertIDs map[int64][]int64 + insertTimestamps map[int64][]uint64 + insertRecords map[int64][][]byte + insertOffset map[int64]int64 +} + +type DeleteData struct { + deleteIDs map[int64][]int64 + deleteTimestamps map[int64][]uint64 + deleteOffset map[int64]int64 +} + type DeleteRecord struct { entityID int64 timestamp uint64 segmentID int64 } -type DeleteRecords struct { - deleteRecords *[]DeleteRecord - count chan int +type DeletePreprocessData struct { + deleteRecords []*DeleteRecord + count chan int } type QueryNodeDataBuffer struct { @@ -43,13 +53,15 @@ type QueryNodeDataBuffer struct { } type QueryNode struct { - QueryNodeId uint64 - Collections []*Collection - SegmentsMap map[int64]*Segment - messageClient message_client.MessageClient - queryNodeTimeSync *QueryNodeTime - deleteRecordsMap map[TimeRange]DeleteRecords - buffer QueryNodeDataBuffer + QueryNodeId uint64 + Collections []*Collection + SegmentsMap map[int64]*Segment + messageClient message_client.MessageClient + queryNodeTimeSync *QueryNodeTime + buffer QueryNodeDataBuffer + deletePreprocessData DeletePreprocessData + deleteData DeleteData + insertData InsertData } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { @@ -71,7 +83,6 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { SegmentsMap: segmentsMap, messageClient: mc, queryNodeTimeSync: queryNodeTimeSync, - deleteRecordsMap: make(map[TimeRange]DeleteRecords), } } @@ -95,19 +106,6 @@ func (node *QueryNode) DeleteCollection(collection *Collection) { //////////////////////////////////////////////////////////////////////////////////////////////////// -func (node *QueryNode) doQueryNode(wg *sync.WaitGroup) { - wg.Add(3) - // Do insert and delete messages sort, do insert - go node.InsertAndDelete(node.messageClient.InsertOrDeleteMsg, wg) - // Do delete messages sort - go node.searchDeleteInMap() - // Do delete - go node.Delete() - // Do search - go node.Search(node.messageClient.SearchMsg, wg) - wg.Wait() -} - func (node *QueryNode) PrepareBatchMsg() { node.messageClient.PrepareBatchMsg() } @@ -119,60 +117,6 @@ func (node *QueryNode) StartMessageClient() { go node.messageClient.ReceiveMessage() } -// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs -func (node *QueryNode) GetKey2Segments() ([]int64, []uint64, []int64) { - // TODO: get id2segment info from pulsar - return nil, nil, nil -} - -func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) { - var targetPartition *Partition - - for _, collection := range node.Collections { - if *collectionName == collection.CollectionName { - for _, partition := range collection.Partitions { - if *partitionTag == partition.PartitionName { - targetPartition = partition - break - } - } - } - } - - if targetPartition == nil { - return nil, errors.New("cannot found target partition") - } - - for _, segment := range targetPartition.OpenedSegments { - // TODO: add other conditions - return segment, nil - } - - return nil, errors.New("cannot found target segment") -} - -func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Collection, error) { - for _, collection := range node.Collections { - if collection.CollectionName == collectionName { - return collection, nil - } - } - - return nil, errors.New("Cannot found collection: " + collectionName) -} - -func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) { - targetSegment := node.SegmentsMap[segmentID] - - if targetSegment == nil { - return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) - } - - return targetSegment, nil -} - -//////////////////////////////////////////////////////////////////////////////////////////////////// - func (node *QueryNode) InitQueryNodeCollection() { // TODO: remove hard code, add collection creation request // TODO: error handle @@ -182,70 +126,47 @@ func (node *QueryNode) InitQueryNodeCollection() { var _ = newPartition.NewSegment(0) } -func (node *QueryNode) SegmentsManagement() { - node.queryNodeTimeSync.UpdateTSOTimeSync() - var timeNow = node.queryNodeTimeSync.TSOTimeSync - for _, collection := range node.Collections { - for _, partition := range collection.Partitions { - for _, oldSegment := range partition.OpenedSegments { - // TODO: check segment status - if timeNow >= oldSegment.SegmentCloseTime { - // start new segment and add it into partition.OpenedSegments - // TODO: get segmentID from master - var segmentID int64 = 0 - var newSegment = partition.NewSegment(segmentID) - newSegment.SegmentCloseTime = timeNow + SegmentLifetime - partition.OpenedSegments = append(partition.OpenedSegments, newSegment) - node.SegmentsMap[segmentID] = newSegment +//////////////////////////////////////////////////////////////////////////////////////////////////// - // close old segment and move it into partition.ClosedSegments - // TODO: check status - var _ = oldSegment.Close() - partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) - } - } - } - } -} - -func (node *QueryNode) SegmentService() { +func (node *QueryNode) RunInsertDelete() { for { - time.Sleep(200 * time.Millisecond) - node.SegmentsManagement() - fmt.Println("do segments management in 200ms") + // TODO: get timeRange from message client + var timeRange = TimeRange{0, 0} + node.PrepareBatchMsg() + node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) + node.WriterDelete() + node.PreInsertAndDelete() + node.DoInsertAndDelete() + node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) } } -/////////////////////////////////////////////////////////////////////////////////////////////////// -// TODO: receive delete messages individually -func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, wg *sync.WaitGroup) msgPb.Status { - node.queryNodeTimeSync.UpdateReadTimeSync() +func (node *QueryNode) RunSearch() { + for { + node.Search(node.messageClient.SearchMsg) + } +} - var tMin = node.queryNodeTimeSync.ReadTimeSyncMin - var tMax = node.queryNodeTimeSync.ReadTimeSyncMax - var readTimeSyncRange = TimeRange{timestampMin: tMin, timestampMax: tMax} +//////////////////////////////////////////////////////////////////////////////////////////////////// - var clientId = insertDeleteMessages[0].ClientId - - var insertIDs = make(map[int64][]int64) - var insertTimestamps = make(map[int64][]uint64) - var insertRecords = make(map[int64][][]byte) +func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, timeRange TimeRange) msgPb.Status { + var tMax = timeRange.timestampMax // 1. Extract messages before readTimeSync from QueryNodeDataBuffer. // Set valid bitmap to false. for i, msg := range node.buffer.InsertDeleteBuffer { - if msg.Timestamp <= tMax { + if msg.Timestamp < tMax { if msg.Op == msgPb.OpType_INSERT { - insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid) - insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp) - insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob) + node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) + node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) + node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord { entityID: msg.Uid, timestamp: msg.Timestamp, } - *node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r) - node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1 + node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r) + node.deletePreprocessData.count <- <- node.deletePreprocessData.count + 1 } node.buffer.validInsertDeleteBuffer[i] = false } @@ -264,18 +185,18 @@ func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDel // Move massages after readTimeSync to QueryNodeDataBuffer. // Set valid bitmap to true. for _, msg := range insertDeleteMessages { - if msg.Timestamp <= tMax { + if msg.Timestamp < tMax { if msg.Op == msgPb.OpType_INSERT { - insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid) - insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp) - insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob) + node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) + node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) + node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord { entityID: msg.Uid, timestamp: msg.Timestamp, } - *node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r) - node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1 + node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r) + node.deletePreprocessData.count <- <- node.deletePreprocessData.count + 1 } } else { node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg) @@ -283,81 +204,118 @@ func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDel } } - // 4. Do insert - // TODO: multi-thread insert - for segmentID, records := range insertRecords { + return msgPb.Status{ErrorCode: 0} +} + +func (node *QueryNode) WriterDelete() msgPb.Status { + // TODO: set timeout + for { + var ids, timestamps, segmentIDs = node.GetKey2Segments() + for i := 0; i <= len(ids); i++ { + id := ids[i] + timestamp := timestamps[i] + segmentID := segmentIDs[i] + for _, r := range node.deletePreprocessData.deleteRecords { + if r.timestamp == timestamp && r.entityID == id { + r.segmentID = segmentID + node.deletePreprocessData.count <- <- node.deletePreprocessData.count - 1 + } + } + } + if <- node.deletePreprocessData.count == 0 { + return msgPb.Status{ErrorCode: 0} + } + } +} + +func (node *QueryNode) PreInsertAndDelete() msgPb.Status { + // 1. Do PreInsert + for segmentID := range node.insertData.insertRecords { var targetSegment, err = node.GetSegmentBySegmentID(segmentID) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - ids := insertIDs[segmentID] - timestamps := insertTimestamps[segmentID] - err = targetSegment.SegmentInsert(&ids, ×tamps, &records, tMin, tMax) + var numOfRecords = len(node.insertData.insertRecords[segmentID]) + var offset = targetSegment.SegmentPreInsert(numOfRecords) + node.insertData.insertOffset[segmentID] = offset + } + + // 2. Sort delete preprocess data by segment id + for _, r := range node.deletePreprocessData.deleteRecords { + node.deleteData.deleteIDs[r.segmentID] = append(node.deleteData.deleteIDs[r.segmentID], r.entityID) + node.deleteData.deleteTimestamps[r.segmentID] = append(node.deleteData.deleteTimestamps[r.segmentID], r.timestamp) + } + + // 3. Do PreDelete + for segmentID := range node.deleteData.deleteIDs { + var targetSegment, err = node.GetSegmentBySegmentID(segmentID) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - } - - wg.Done() - return publishResult(nil, clientId) -} - -func (node *QueryNode) searchDeleteInMap() { - var ids, timestamps, segmentIDs = node.GetKey2Segments() - - for i := 0; i <= len(ids); i++ { - id := ids[i] - timestamp := timestamps[i] - segmentID := segmentIDs[i] - for timeRange, records := range node.deleteRecordsMap { - if timestamp < timeRange.timestampMax && timestamp > timeRange.timestampMin { - for _, r := range *records.deleteRecords { - if r.timestamp == timestamp && r.entityID == id { - r.segmentID = segmentID - records.count <- <- records.count - 1 - } - } - } - } - } -} - -func (node *QueryNode) Delete() msgPb.Status { - type DeleteData struct { - ids *[]int64 - timestamp *[]uint64 - } - for timeRange, records := range node.deleteRecordsMap { - // TODO: multi-thread delete - if <- records.count == 0 { - // 1. Sort delete records by segment id - segment2records := make(map[int64]DeleteData) - for _, r := range *records.deleteRecords { - *segment2records[r.segmentID].ids = append(*segment2records[r.segmentID].ids, r.entityID) - *segment2records[r.segmentID].timestamp = append(*segment2records[r.segmentID].timestamp, r.timestamp) - } - // 2. Do batched delete - for segmentID, deleteData := range segment2records { - var segment, err = node.GetSegmentBySegmentID(segmentID) - if err != nil { - fmt.Println(err.Error()) - return msgPb.Status{ErrorCode: 1} - } - err = segment.SegmentDelete(deleteData.ids, deleteData.timestamp, timeRange.timestampMin, timeRange.timestampMax) - if err != nil { - fmt.Println(err.Error()) - return msgPb.Status{ErrorCode: 1} - } - } - } + var numOfRecords = len(node.deleteData.deleteIDs[segmentID]) + var offset = targetSegment.SegmentPreDelete(numOfRecords) + node.deleteData.deleteOffset[segmentID] = offset } return msgPb.Status{ErrorCode: 0} } -func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGroup) msgPb.Status { +func (node *QueryNode) DoInsertAndDelete() msgPb.Status { + var wg sync.WaitGroup + // Do insert + for segmentID, records := range node.insertData.insertRecords { + wg.Add(1) + go node.DoInsert(segmentID, &records, &wg) + } + + // Do delete + for segmentID, deleteIDs := range node.deleteData.deleteIDs { + wg.Add(1) + var deleteTimestamps = node.deleteData.deleteTimestamps[segmentID] + go node.DoDelete(segmentID, &deleteIDs, &deleteTimestamps, &wg) + } + + wg.Wait() + return msgPb.Status{ErrorCode: 0} +} + +func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status { + var targetSegment, err = node.GetSegmentBySegmentID(segmentID) + if err != nil { + fmt.Println(err.Error()) + return msgPb.Status{ErrorCode: 1} + } + ids := node.insertData.insertIDs[segmentID] + timestamps := node.insertData.insertTimestamps[segmentID] + err = targetSegment.SegmentInsert(&ids, ×tamps, records) + if err != nil { + fmt.Println(err.Error()) + return msgPb.Status{ErrorCode: 1} + } + + wg.Done() + return msgPb.Status{ErrorCode: 0} +} + +func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimestamps *[]uint64, wg *sync.WaitGroup) msgPb.Status { + var segment, err = node.GetSegmentBySegmentID(segmentID) + if err != nil { + fmt.Println(err.Error()) + return msgPb.Status{ErrorCode: 1} + } + err = segment.SegmentDelete(deleteIDs, deleteTimestamps) + if err != nil { + fmt.Println(err.Error()) + return msgPb.Status{ErrorCode: 1} + } + + wg.Done() + return msgPb.Status{ErrorCode: 0} +} + +func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { var clientId = searchMessages[0].ClientId type SearchResultTmp struct { @@ -379,9 +337,16 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGr // TODO: get top-k's k from queryString const TopK = 1 - // 1. Do search in all segments var timestamp = msg.Timestamp var vector = msg.Records + + // 1. Timestamp check + // TODO: return or wait? Or adding graceful time + if timestamp > node.queryNodeTimeSync.SearchTimeSync { + return msgPb.Status{ErrorCode: 1} + } + + // 2. Do search in all segments for _, partition := range targetCollection.Partitions { for _, openSegment := range partition.OpenedSegments { var res, err = openSegment.SegmentSearch("", timestamp, vector) @@ -420,6 +385,5 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGr publishSearchResult(&results, clientId) } - wg.Done() return msgPb.Status{ErrorCode: 0} } diff --git a/reader/query_node_time.go b/reader/query_node_time.go index 1c9afd3f18..629729500f 100644 --- a/reader/query_node_time.go +++ b/reader/query_node_time.go @@ -24,9 +24,8 @@ func (t *QueryNodeTime) UpdateWriteTimeSync() { t.WriteTimeSync = 0 } -func (t *QueryNodeTime) UpdateSearchTimeSync() { - // TODO: Add time sync - t.SearchTimeSync = 0 +func (t *QueryNodeTime) UpdateSearchTimeSync(timeRange TimeRange) { + t.SearchTimeSync = timeRange.timestampMax } func (t *QueryNodeTime) UpdateTSOTimeSync() { diff --git a/reader/reader.go b/reader/reader.go index 6d87cfa473..dc9c58fe8a 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -1,22 +1,11 @@ package reader -import ( - "fmt" - "sync" - "time" -) - func startQueryNode() { qn := NewQueryNode(0, 0) qn.InitQueryNodeCollection() go qn.SegmentService() qn.StartMessageClient() - var wg sync.WaitGroup - for { - time.Sleep(200 * time.Millisecond) - qn.PrepareBatchMsg() - qn.doQueryNode(&wg) - fmt.Println("do a batch in 200ms") - } + go qn.RunInsertDelete() + go qn.RunSearch() } diff --git a/reader/segment.go b/reader/segment.go index d5704e3c1b..aa668bb7a7 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -76,7 +76,19 @@ func (s *Segment) Close() error { } //////////////////////////////////////////////////////////////////////////// -func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte, timestampMin uint64, timestampMax uint64) error { +func (s *Segment) SegmentPreInsert(numOfRecords int) int64 { + var offset = C.PreInsert(numOfRecords) + + return offset +} + +func (s *Segment) SegmentPreDelete(numOfRecords int) int64 { + var offset = C.PreDelete(numOfRecords) + + return offset +} + +func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte) error { /*C.Insert int Insert(CSegmentBase c_segment, @@ -121,7 +133,7 @@ func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, record return nil } -func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64, timestampMin uint64, timestampMax uint64) error { +func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error { /*C.Delete int Delete(CSegmentBase c_segment, diff --git a/reader/segment_management.go b/reader/segment_management.go new file mode 100644 index 0000000000..1dd20026b0 --- /dev/null +++ b/reader/segment_management.go @@ -0,0 +1,40 @@ +package reader + +import ( + "fmt" + "time" +) + +func (node *QueryNode) SegmentsManagement() { + node.queryNodeTimeSync.UpdateTSOTimeSync() + var timeNow = node.queryNodeTimeSync.TSOTimeSync + for _, collection := range node.Collections { + for _, partition := range collection.Partitions { + for _, oldSegment := range partition.OpenedSegments { + // TODO: check segment status + if timeNow >= oldSegment.SegmentCloseTime { + // start new segment and add it into partition.OpenedSegments + // TODO: get segmentID from master + var segmentID int64 = 0 + var newSegment = partition.NewSegment(segmentID) + newSegment.SegmentCloseTime = timeNow + SegmentLifetime + partition.OpenedSegments = append(partition.OpenedSegments, newSegment) + node.SegmentsMap[segmentID] = newSegment + + // close old segment and move it into partition.ClosedSegments + // TODO: check status + var _ = oldSegment.Close() + partition.ClosedSegments = append(partition.ClosedSegments, oldSegment) + } + } + } + } +} + +func (node *QueryNode) SegmentService() { + for { + time.Sleep(200 * time.Millisecond) + node.SegmentsManagement() + fmt.Println("do segments management in 200ms") + } +} diff --git a/reader/segment_test.go b/reader/segment_test.go index 3827b28bb7..39303bca6d 100644 --- a/reader/segment_test.go +++ b/reader/segment_test.go @@ -26,7 +26,7 @@ func TestSegmentInsert(t *testing.T) { ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentInsert(&ids, ×tamps, nil, 0, 0) + var err = segment.SegmentInsert(&ids, ×tamps, nil) assert.NoError(t, err) partition.DeleteSegment(segment) @@ -43,7 +43,7 @@ func TestSegmentDelete(t *testing.T) { ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentDelete(&ids, ×tamps, 0, 0) + var err = segment.SegmentDelete(&ids, ×tamps) assert.NoError(t, err) partition.DeleteSegment(segment) @@ -60,7 +60,7 @@ func TestSegmentSearch(t *testing.T) { ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var insertErr = segment.SegmentInsert(&ids, ×tamps, nil, 0, 0) + var insertErr = segment.SegmentInsert(&ids, ×tamps, nil) assert.NoError(t, insertErr) var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil) @@ -109,7 +109,7 @@ func TestSegment_GetRowCount(t *testing.T) { ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentInsert(&ids, ×tamps, nil, 0, 0) + var err = segment.SegmentInsert(&ids, ×tamps, nil) assert.NoError(t, err) var rowCount = segment.GetRowCount() @@ -129,7 +129,7 @@ func TestSegment_GetDeletedCount(t *testing.T) { ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentDelete(&ids, ×tamps, 0, 0) + var err = segment.SegmentDelete(&ids, ×tamps) assert.NoError(t, err) var deletedCount = segment.GetDeletedCount() diff --git a/reader/util_functions.go b/reader/util_functions.go new file mode 100644 index 0000000000..cc5b133319 --- /dev/null +++ b/reader/util_functions.go @@ -0,0 +1,58 @@ +package reader + +import ( + "errors" + "strconv" +) + +// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs +func (node *QueryNode) GetKey2Segments() ([]int64, []uint64, []int64) { + // TODO: get id2segment info from pulsar + return nil, nil, nil +} + +func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) { + var targetPartition *Partition + + for _, collection := range node.Collections { + if *collectionName == collection.CollectionName { + for _, partition := range collection.Partitions { + if *partitionTag == partition.PartitionName { + targetPartition = partition + break + } + } + } + } + + if targetPartition == nil { + return nil, errors.New("cannot found target partition") + } + + for _, segment := range targetPartition.OpenedSegments { + // TODO: add other conditions + return segment, nil + } + + return nil, errors.New("cannot found target segment") +} + +func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Collection, error) { + for _, collection := range node.Collections { + if collection.CollectionName == collectionName { + return collection, nil + } + } + + return nil, errors.New("Cannot found collection: " + collectionName) +} + +func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) { + targetSegment := node.SegmentsMap[segmentID] + + if targetSegment == nil { + return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) + } + + return targetSegment, nil +} diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index 95750fb405..44c729bfa8 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -17,6 +17,7 @@ type ReaderTimeSync interface { Close() TimeSync() <-chan TimeSyncMsg InsertOrDelete() <-chan *pb.InsertOrDeleteMsg + IsInsertDeleteChanFull() bool } type TimeSyncMsg struct { @@ -45,9 +46,9 @@ type readerTimeSyncCfg struct { cancel context.CancelFunc } -func toTimeStamp(ts *pb.TimeSyncMsg) int { +func toMillisecond(ts *pb.TimeSyncMsg) int { // get Millisecond in second - return int(ts.GetTimestamp()>>18) % 1000 + return int(ts.GetTimestamp() >> 18) } func NewReaderTimeSync( @@ -86,7 +87,7 @@ func NewReaderTimeSync( } //set default value if r.readerQueueSize == 0 { - r.readerQueueSize = 128 + r.readerQueueSize = 1024 } r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize) @@ -158,13 +159,17 @@ func (r *readerTimeSyncCfg) TimeSync() <-chan TimeSyncMsg { return r.timesyncMsgChan } +func (r *readerTimeSyncCfg) IsInsertDeleteChanFull() bool { + return len(r.insertOrDeleteChan) == len(r.readerProducer)*r.readerQueueSize +} + func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg { if len(r.proxyIdList) > 1 { if len(ts) > 1 { for i := 1; i < len(r.proxyIdList); i++ { curIdx := len(ts) - 1 - i preIdx := len(ts) - i - timeGap := toTimeStamp(ts[curIdx]) - toTimeStamp(ts[preIdx]) + timeGap := toMillisecond(ts[curIdx]) - toMillisecond(ts[preIdx]) if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) { ts = ts[preIdx:] return ts @@ -274,6 +279,9 @@ func (r *readerTimeSyncCfg) startReadTopics() { r.revTimesyncFromReader[imsg.Timestamp] = gval } } else { + if r.IsInsertDeleteChanFull() { + log.Printf("WARN : Insert or delete chan is full ...") + } tsm.NumRecorders++ r.insertOrDeleteChan <- &imsg } diff --git a/timesync/readertimesync_test.go b/timesync/readertimesync_test.go index 7b089904c1..cf0d5da135 100644 --- a/timesync/readertimesync_test.go +++ b/timesync/readertimesync_test.go @@ -320,7 +320,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64 t.Fatalf("send msg error %v", err) } - log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp) + //log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp) if i%20 == 0 { tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18} @@ -331,7 +331,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64 if _, err := pt.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil { t.Fatalf("send msg error %v", err) } - log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp) + //log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp) } } }