diff --git a/pulsar/schema/message.go b/pulsar/schema/message.go index 5358c470f6..c04bcffdaf 100644 --- a/pulsar/schema/message.go +++ b/pulsar/schema/message.go @@ -139,7 +139,7 @@ type SearchMsg struct { CollectionName string PartitionTag string VectorParam *VectorParam - Timestamp int64 + Timestamp uint64 ClientId int64 MsgType OpType } diff --git a/reader/query_node.go b/reader/query_node.go index 782b98a5bb..5b985dd78e 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -10,6 +10,15 @@ import ( "time" ) +type QueryNodeDataBuffer struct { + InsertBuffer []*schema.InsertMsg + DeleteBuffer []*schema.DeleteMsg + SearchBuffer []*schema.SearchMsg + validInsertBuffer []bool + validDeleteBuffer []bool + validSearchBuffer []bool +} + type QueryNodeTimeSync struct { deleteTimeSync uint64 insertTimeSync uint64 @@ -20,6 +29,7 @@ type QueryNode struct { Collections []*Collection messageClient pulsar.MessageClient queryNodeTimeSync *QueryNodeTimeSync + buffer QueryNodeDataBuffer } func NewQueryNode(timeSync uint64) *QueryNode { @@ -122,7 +132,7 @@ func (node *QueryNode) SegmentsManagement() { segment.Close() // TODO: add atomic segment id var newSegment, _ = partition.NewSegment() - newSegment.SegmentCloseTime = timeSync + SEGMENT_LIFETIME + newSegment.SegmentCloseTime = timeSync + SegmentLifetime partition.Segments = append(partition.Segments, newSegment) } } @@ -140,6 +150,7 @@ func (node *QueryNode) SegmentService() { /////////////////////////////////////////////////////////////////////////////////////////////////// func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitGroup) schema.Status { + var timeSync = node.GetTimeSync() var collectionName = insertMessages[0].CollectionName var partitionTag = insertMessages[0].PartitionTag var clientId = insertMessages[0].ClientId @@ -148,10 +159,33 @@ func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitG var entityIds []int64 var timestamps []uint64 var vectorRecords [][]*schema.FieldValue + + for i, msg := range node.buffer.InsertBuffer { + if msg.Timestamp <= timeSync { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + vectorRecords = append(vectorRecords, msg.Fields) + node.buffer.validInsertBuffer[i] = false + } + } + + for i, isValid := range node.buffer.validInsertBuffer { + if !isValid { + copy(node.buffer.InsertBuffer[i:], node.buffer.InsertBuffer[i+1:]) // Shift a[i+1:] left one index. + node.buffer.InsertBuffer[len(node.buffer.InsertBuffer)-1] = nil // Erase last element (write zero value). + node.buffer.InsertBuffer = node.buffer.InsertBuffer[:len(node.buffer.InsertBuffer)-1] // Truncate slice. + } + } + for _, msg := range insertMessages { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - vectorRecords = append(vectorRecords, msg.Fields) + if msg.Timestamp <= timeSync { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + vectorRecords = append(vectorRecords, msg.Fields) + } else { + node.buffer.InsertBuffer = append(node.buffer.InsertBuffer, msg) + node.buffer.validInsertBuffer = append(node.buffer.validInsertBuffer, true) + } } var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) @@ -168,15 +202,38 @@ func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitG } func (node *QueryNode) Delete(deleteMessages []*schema.DeleteMsg, wg *sync.WaitGroup) schema.Status { + var timeSync = node.GetTimeSync() var collectionName = deleteMessages[0].CollectionName var clientId = deleteMessages[0].ClientId // TODO: prevent Memory copy var entityIds []int64 var timestamps []uint64 + + for i, msg := range node.buffer.DeleteBuffer { + if msg.Timestamp <= timeSync { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + node.buffer.validDeleteBuffer[i] = false + } + } + + for i, isValid := range node.buffer.validDeleteBuffer { + if !isValid { + copy(node.buffer.DeleteBuffer[i:], node.buffer.DeleteBuffer[i+1:]) // Shift a[i+1:] left one index. + node.buffer.DeleteBuffer[len(node.buffer.DeleteBuffer)-1] = nil // Erase last element (write zero value). + node.buffer.DeleteBuffer = node.buffer.DeleteBuffer[:len(node.buffer.DeleteBuffer)-1] // Truncate slice. + } + } + for _, msg := range deleteMessages { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) + if msg.Timestamp <= timeSync { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + } else { + node.buffer.DeleteBuffer = append(node.buffer.DeleteBuffer, msg) + node.buffer.validDeleteBuffer = append(node.buffer.validDeleteBuffer, true) + } } if entityIds == nil { @@ -194,6 +251,7 @@ func (node *QueryNode) Delete(deleteMessages []*schema.DeleteMsg, wg *sync.WaitG } func (node *QueryNode) Search(searchMessages []*schema.SearchMsg, wg *sync.WaitGroup) schema.Status { + var timeSync = node.GetTimeSync() var collectionName = searchMessages[0].CollectionName var partitionTag = searchMessages[0].PartitionTag var clientId = searchMessages[0].ClientId @@ -201,10 +259,32 @@ func (node *QueryNode) Search(searchMessages []*schema.SearchMsg, wg *sync.WaitG // TODO: prevent Memory copy var records []schema.VectorRecord - var timestamps []int64 + var timestamps []uint64 + + for i, msg := range node.buffer.SearchBuffer { + if msg.Timestamp <= timeSync { + records = append(records, *msg.VectorParam.RowRecord) + timestamps = append(timestamps, msg.Timestamp) + node.buffer.validSearchBuffer[i] = false + } + } + + for i, isValid := range node.buffer.validSearchBuffer { + if !isValid { + copy(node.buffer.SearchBuffer[i:], node.buffer.SearchBuffer[i+1:]) // Shift a[i+1:] left one index. + node.buffer.SearchBuffer[len(node.buffer.SearchBuffer)-1] = nil // Erase last element (write zero value). + node.buffer.SearchBuffer = node.buffer.SearchBuffer[:len(node.buffer.SearchBuffer)-1] // Truncate slice. + } + } + for _, msg := range searchMessages { - records = append(records, *msg.VectorParam.RowRecord) - timestamps = append(timestamps, msg.Timestamp) + if msg.Timestamp <= timeSync { + records = append(records, *msg.VectorParam.RowRecord) + timestamps = append(timestamps, msg.Timestamp) + } else { + node.buffer.SearchBuffer = append(node.buffer.SearchBuffer, msg) + node.buffer.validSearchBuffer = append(node.buffer.validSearchBuffer, true) + } } var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) diff --git a/reader/segment.go b/reader/segment.go index 346b3400a4..66ffc9e877 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -6,7 +6,7 @@ import ( "suvlim/pulsar/schema" ) -const SEGMENT_LIFETIME = 20000 +const SegmentLifetime = 20000 type Segment struct { SegmentPtr *C.SegmentBase @@ -76,7 +76,7 @@ func SegmentDelete(segment *Segment, collectionName string, entityIds *[]int64, return ResultEntityIds{} } -func SegmentSearch(segment *Segment, collectionName string, queryString string, timestamps *[]int64, vectorRecord *[]schema.VectorRecord) ResultEntityIds { +func SegmentSearch(segment *Segment, collectionName string, queryString string, timestamps *[]uint64, vectorRecord *[]schema.VectorRecord) ResultEntityIds { // TODO: wrap cgo return ResultEntityIds{} }