Support Segment timestamps

Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
FluorineDog 2020-08-29 14:57:45 +08:00 committed by yefu.chen
parent 04c27b7ccf
commit 35e45d5766
6 changed files with 281 additions and 118 deletions

View File

@ -12,7 +12,6 @@ using engine::QueryResult;
int
TestABI();
class SegmentBase {
public:
// definitions
@ -78,6 +77,25 @@ class SegmentBase {
public:
// getter and setter
Timestamp get_time_begin() {
return time_begin_;
}
void set_time_begin(Timestamp time_begin) {
this->time_begin_ = time_begin;
}
Timestamp get_time_end() {
return time_end_;
}
void set_time_end(Timestamp time_end) {
this->time_end_ = time_end;
}
uint64_t get_segment_id(uint64_t segment_id) {
return segment_id_;
}
uint64_t set_segment_id(uint64_t segment_id) {
this->segment_id_ = segment_id;
}
private:
Timestamp time_begin_;
Timestamp time_end_;

View File

@ -4,7 +4,7 @@ import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"log"
"suvlim/pulsar/schema"
"github.com/czs007/suvlim/pulsar/schema"
"sync"
)

View File

@ -15,7 +15,11 @@ func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
wg.Add(3)
go qn.insert_query(qn.mc.InsertMsg, wg)
go qn.delete_query(qn.mc.DeleteMsg, wg)
<<<<<<< HEAD
go qn.search_query(qn.mc.searchMsg, wg)
=======
go qn.search_query(qn.mc.SearchMsg, wg)
>>>>>>> 1ab497232c9c1179499c456a250dd6e73a3259b2
wg.Wait()
}

View File

@ -2,7 +2,7 @@ package pulsar
import (
"fmt"
"suvlim/pulsar/schema"
"github.com/czs007/suvlim/pulsar/schema"
"sync"
"time"
)
@ -11,14 +11,13 @@ type WriteNode struct {
mc MessageClient
}
func (wn *WriteNode)doWriteNode(wg sync.WaitGroup) {
func (wn *WriteNode) doWriteNode(wg sync.WaitGroup) {
wg.Add(2)
go wn.insert_write(wn.mc.InsertMsg, wg)
go wn.delete_write(wn.mc.DeleteMsg, wg)
wg.Wait()
}
func (wn *WriteNode) PrepareBatchMsg() {
wn.mc.PrepareBatchMsg(JobType(1))
}
@ -40,16 +39,12 @@ func main() {
}
}
func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{
func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{
func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -1,10 +1,9 @@
package main
import (
"context"
"container/list"
"fmt"
"github.com/czs007/suvlim/pulsar/schema"
"github.com/czs007/suvlim/writer"
)
func GetInsertMsg(entityId int64) *schema.InsertMsg {
@ -25,40 +24,46 @@ func GetDeleteMsg(entityId int64) *schema.DeleteMsg {
}
}
//type example struct {
// id int
//}
//
//type data struct {
// buffer *list.List
//}
//func GetExample(num int) []*example {
// var examples []*example
// i := 0
// for i = 0; i < num; i++ {
// examples = append(examples, &example{id: i})
// }
// return examples
//}
//
//func GetValue(data *list.List, value []int) []int {
// for e := data.Front(); e != nil; e = e.Next() {
// value = append(value, e.Value.(*example).id)
// }
// return value
//}
func main() {
ctx := context.Background()
writer, err := writer.NewWriteNode(ctx,
"collection_tag01_seg01",
100,
"collection_tag01_seg02",
200,
0)
if err != nil {
fmt.Println("Can't create write node")
//ctx := context.Background()
deleteBuffer := list.New()
//insertBuffer := list.New()
deleteBuffer.PushBack(1)
deleteBuffer.PushBack(2)
var data []*list.Element
for e := deleteBuffer.Front(); e != nil; e = e.Next() {
if e.Value.(int) == 1 {
data = append(data, e)
}
}
var data1 []*schema.InsertMsg
var i int64
for i = 0; i < 100; i++ {
data1 = append(data1, GetInsertMsg(i))
}
writer.InsertBatchData(ctx, data1, 99)
var data2 []*schema.InsertMsg
for i = 100; i < 200; i++ {
data2 = append(data2, GetInsertMsg(i))
}
writer.InsertBatchData(ctx, data2, 199)
var deleteData []*schema.DeleteMsg
for i = 0; i < 99; i++ {
deleteData = append(deleteData, GetDeleteMsg(i))
}
for i = 100; i < 110; i++ {
deleteData = append(deleteData, GetDeleteMsg(i))
}
writer.DeleteBatchData(ctx, deleteData, 110)
kvMap := (*writer.KvStore).GetData(ctx)
for k, v := range kvMap {
fmt.Println(k + ":" + string(v))
}
fmt.Println(data[0].Value.(int))
//writeNode := writer.NewWriteNode(
// ctx,
// "",
// )
//a := make(map[string]in)
}

View File

@ -1,106 +1,247 @@
package writer
import (
"container/list"
"context"
"fmt"
"github.com/czs007/suvlim/pulsar"
"github.com/czs007/suvlim/pulsar/schema"
"github.com/czs007/suvlim/writer/mock"
"strconv"
"sync"
)
type writeNodeTimeSync struct {
deleteTimeSync uint64
insertTimeSync uint64
}
type writeNode struct {
type CollectionMeta struct {
collionName string
openSegmentId string
segmentCloseTime uint64
nextSegmentId string
nextSegmentCloseTime uint64
KvStore *mock.TikvStore
timeSyncTable *writeNodeTimeSync
deleteTimeSync uint64
insertTimeSync uint64
}
type WriteNode struct {
KvStore *mock.TikvStore
mc *pulsar.MessageClient
collectionMap map[string]*CollectionMeta
gtInsertMsgBuffer *list.List
gtDeleteMsgBuffer *list.List
}
func NewWriteNode(ctx context.Context,
openSegmentId string,
closeTime uint64,
nextSegmentId string,
nextCloseSegmentTime uint64,
timeSync uint64) (*writeNode, error) {
store, err := mock.NewTikvStore()
writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync}
if err != nil {
return nil, err
collectionName []string,
openSegmentId []string,
closeTime []uint64,
nextSegmentId []string,
nextCloseSegmentTime []uint64,
timeSync []uint64,
mc *pulsar.MessageClient) (*WriteNode, error) {
kv, err := mock.NewTikvStore()
collectionMap := make(map[string]*CollectionMeta)
for i := 0; i < len(collectionName); i++ {
collectionMap[collectionName[i]] = &CollectionMeta{
collionName: collectionName[i],
openSegmentId: openSegmentId[i],
segmentCloseTime: closeTime[i],
nextSegmentId: nextSegmentId[i],
nextSegmentCloseTime: nextCloseSegmentTime[i],
deleteTimeSync: timeSync[i],
insertTimeSync: timeSync[i],
}
}
return &writeNode{
KvStore: store,
openSegmentId: openSegmentId,
nextSegmentId: nextSegmentId,
segmentCloseTime: closeTime,
nextSegmentCloseTime: nextCloseSegmentTime,
timeSyncTable: writeTableTimeSync,
}, nil
return &WriteNode{
KvStore: kv,
mc: mc,
collectionMap: collectionMap,
gtInsertMsgBuffer: list.New(),
gtDeleteMsgBuffer: list.New(),
}, err
}
func (s *writeNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64) error {
var i int
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync map[string]uint64, wg sync.WaitGroup) error {
var storeKey string
keyMap := make(map[string][][]byte)
binaryDataMap := make(map[string][][]byte)
timeStampMap := make(map[string][]uint64)
var keys [][]byte
var binaryData [][]byte
var timeStamps []uint64
keyMap, binaryDataMap, timeStampMap = wn.AddInsertMsgBufferData(keyMap, binaryDataMap, timeStampMap, timeSync)
for i = 0; i < len(data); i++ {
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keys = append(keys, []byte(storeKey))
binaryData = append(binaryData, data[i].Serialization())
timeStamps = append(timeStamps, data[i].Timestamp)
for i := 0; i < len(data); i++ {
if data[i].Timestamp <= timeSync[data[i].CollectionName] {
CollectionName := data[i].CollectionName
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keyMap[CollectionName] = append(keyMap[CollectionName], []byte(storeKey))
binaryDataMap[CollectionName] = append(binaryDataMap[CollectionName], data[i].Serialization())
timeStampMap[CollectionName] = append(timeStampMap[CollectionName], data[i].Timestamp)
} else {
wn.gtInsertMsgBuffer.PushBack(data[i])
}
}
if s.segmentCloseTime <= timeSync {
s.openSegmentId = s.nextSegmentId
s.segmentCloseTime = s.nextSegmentCloseTime
for k, v := range wn.collectionMap {
if v.segmentCloseTime < timeSync[k] {
v.openSegmentId = v.nextSegmentId
v.segmentCloseTime = v.nextSegmentCloseTime
}
}
err := (*s.KvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps)
s.UpdateInsertTimeSync(timeSync)
return err
}
func (s *writeNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64) error {
var i int
var storeKey string
var keys [][]byte
var timeStamps []uint64
for i = 0; i < len(data); i++ {
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keys = append(keys, []byte(storeKey))
timeStamps = append(timeStamps, data[i].Timestamp)
for k, v := range keyMap {
err := (*wn.KvStore).PutRows(ctx, v, binaryDataMap[k], wn.collectionMap[k].openSegmentId, timeStampMap[k])
if err != nil {
fmt.Println("Can't insert data")
}
}
segments := (*s.KvStore).GetSegment(ctx, keys)
mock.DeliverSegmentIds(keys, segments)
err := (*s.KvStore).DeleteRows(ctx, keys, timeStamps)
s.UpdateDeleteTimeSync(timeSync)
return err
}
func (s *writeNode) AddNewSegment(segmentId string, closeSegmentTime uint64) error {
s.nextSegmentId = segmentId
s.nextSegmentCloseTime = closeSegmentTime
wn.UpdateInsertTimeSync(timeSync)
wg.Done()
return nil
}
func (s *writeNode) UpdateInsertTimeSync(timeSync uint64) {
s.timeSyncTable.insertTimeSync = timeSync
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSyncMap map[string]uint64, wg sync.WaitGroup) error {
var storeKey string
keyMap := make(map[string][][]byte)
timeStampMap := make(map[string][]uint64)
keyMap, timeStampMap = wn.AddDeleteMsgBufferData(keyMap, timeStampMap, timeSyncMap)
for i := 0; i < len(data); i++ {
if data[i].Timestamp <= timeSyncMap[data[i].CollectionName] {
CollectionName := data[i].CollectionName
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keyMap[CollectionName] = append(keyMap[CollectionName], []byte(storeKey))
timeStampMap[CollectionName] = append(timeStampMap[CollectionName], data[i].Timestamp)
} else {
wn.gtDeleteMsgBuffer.PushBack(data[i])
}
}
for k, v := range wn.collectionMap {
if v.segmentCloseTime < timeSyncMap[k] {
v.openSegmentId = v.nextSegmentId
v.segmentCloseTime = v.nextSegmentCloseTime
}
}
for k, v := range keyMap {
err := (*wn.KvStore).DeleteRows(ctx, v, timeStampMap[k])
if err != nil {
fmt.Println("Can't insert data")
}
}
wn.UpdateDeleteTimeSync(timeSyncMap)
wg.Done()
return nil
}
func (s *writeNode) UpdateDeleteTimeSync(timeSync uint64) {
s.timeSyncTable.deleteTimeSync = timeSync
func (wn *WriteNode) AddNextSegment(collectionName string, segmentId string, closeSegmentTime uint64) {
wn.collectionMap[collectionName].nextSegmentId = segmentId
wn.collectionMap[collectionName].nextSegmentCloseTime = closeSegmentTime
}
func (s *writeNode) UpdateCloseTime(closeTime uint64) {
s.segmentCloseTime = closeTime
func (wn *WriteNode) UpdateInsertTimeSync(timeSyncMap map[string]uint64) {
for k, v := range wn.collectionMap {
v.insertTimeSync = timeSyncMap[k]
}
}
func (wn *WriteNode) UpdateDeleteTimeSync(timeSyncMap map[string]uint64) {
for k, v := range wn.collectionMap {
v.deleteTimeSync = timeSyncMap[k]
}
}
func (wn *WriteNode) UpdateCloseTime(collectionName string, closeTime uint64) {
wn.collectionMap[collectionName].segmentCloseTime = closeTime
}
func (wn *WriteNode) AddInsertMsgBufferData(keyMap map[string][][]byte,
dataMap map[string][][]byte,
timeStampMap map[string][]uint64,
timeSyncMap map[string]uint64) (map[string][][]byte, map[string][][]byte, map[string][]uint64) {
var storeKey string
var selectElement []*list.Element
for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() {
collectionName := e.Value.(*schema.InsertMsg).CollectionName
if e.Value.(*schema.InsertMsg).Timestamp <= timeSyncMap[collectionName] {
storeKey = collectionName +
strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10)
keyMap[collectionName] = append(keyMap[collectionName], []byte(storeKey))
dataMap[collectionName] = append(dataMap[collectionName], e.Value.(*schema.InsertMsg).Serialization())
timeStampMap[collectionName] = append(timeStampMap[collectionName], e.Value.(*schema.InsertMsg).Timestamp)
selectElement = append(selectElement, e)
}
}
for i := 0; i < len(selectElement); i++ {
wn.gtInsertMsgBuffer.Remove(selectElement[i])
}
return keyMap, dataMap, timeStampMap
}
func (wn *WriteNode) AddDeleteMsgBufferData(keyMap map[string][][]byte,
timeStampMap map[string][]uint64,
timeSyncMap map[string]uint64) (map[string][][]byte, map[string][]uint64) {
var storeKey string
var selectElement []*list.Element
for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() {
collectionName := e.Value.(*schema.InsertMsg).CollectionName
if e.Value.(*schema.InsertMsg).Timestamp <= timeSyncMap[collectionName] {
storeKey = collectionName +
strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10)
keyMap[collectionName] = append(keyMap[collectionName], []byte(storeKey))
timeStampMap[collectionName] = append(timeStampMap[collectionName], e.Value.(*schema.InsertMsg).Timestamp)
selectElement = append(selectElement, e)
}
}
for i := 0; i < len(selectElement); i++ {
wn.gtDeleteMsgBuffer.Remove(selectElement[i])
}
return keyMap, timeStampMap
}
func (wn *WriteNode) AddCollection(collectionName string,
openSegmentId string,
closeTime uint64,
nextSegmentId string,
nextSegmentCloseTime uint64,
timeSync uint64) {
wn.collectionMap[collectionName] = &CollectionMeta{
collionName: collectionName,
openSegmentId: openSegmentId,
segmentCloseTime: closeTime,
nextSegmentId: nextSegmentId,
nextSegmentCloseTime: nextSegmentCloseTime,
deleteTimeSync: timeSync,
insertTimeSync: timeSync,
}
}
func (wn *WriteNode) DeleteCollection(collectionName string) {
delete(wn.collectionMap, collectionName)
var deleteMsg []*list.Element
var insertMsg []*list.Element
for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() {
if e.Value.(*schema.InsertMsg).CollectionName == collectionName {
insertMsg = append(insertMsg, e)
}
}
for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() {
if e.Value.(*schema.DeleteMsg).CollectionName == collectionName {
deleteMsg = append(deleteMsg, e)
}
}
for i := 0; i < len(insertMsg); i++ {
wn.gtInsertMsgBuffer.Remove(insertMsg[i])
}
for i := 0; i < len(deleteMsg); i++ {
wn.gtDeleteMsgBuffer.Remove(deleteMsg[i])
}
}
func (wn *WriteNode) doWriteNode(ctx context.Context, wg sync.WaitGroup) {
deleteTimeSync := make(map[string]uint64)
insertTimeSync := make(map[string]uint64)
wg.Add(2)
go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg)
go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg)
wg.Wait()
}