From 35e45d5766db6bd87f31c62451fb9dcc712d6b6c Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Sat, 29 Aug 2020 14:57:45 +0800 Subject: [PATCH] Support Segment timestamps Signed-off-by: FluorineDog --- core/src/dog_segment/SegmentBase.h | 20 +- pulsar/go_client.go | 2 +- pulsar/query_node.go | 4 + pulsar/storage_node.go | 13 +- writer/test/test_writer.go | 77 ++++---- writer/writer.go | 283 +++++++++++++++++++++-------- 6 files changed, 281 insertions(+), 118 deletions(-) diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index e5d899de99..03abaa29a0 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -12,7 +12,6 @@ using engine::QueryResult; int TestABI(); - class SegmentBase { public: // definitions @@ -78,6 +77,25 @@ class SegmentBase { public: // getter and setter + Timestamp get_time_begin() { + return time_begin_; + } + void set_time_begin(Timestamp time_begin) { + this->time_begin_ = time_begin; + } + Timestamp get_time_end() { + return time_end_; + } + void set_time_end(Timestamp time_end) { + this->time_end_ = time_end; + } + uint64_t get_segment_id(uint64_t segment_id) { + return segment_id_; + } + uint64_t set_segment_id(uint64_t segment_id) { + this->segment_id_ = segment_id; + } + private: Timestamp time_begin_; Timestamp time_end_; diff --git a/pulsar/go_client.go b/pulsar/go_client.go index 9ab5eac15c..fa34f66aae 100644 --- a/pulsar/go_client.go +++ b/pulsar/go_client.go @@ -4,7 +4,7 @@ import ( "context" "github.com/apache/pulsar/pulsar-client-go/pulsar" "log" - "suvlim/pulsar/schema" + "github.com/czs007/suvlim/pulsar/schema" "sync" ) diff --git a/pulsar/query_node.go b/pulsar/query_node.go index 0a4962102a..e37ab590e4 100644 --- a/pulsar/query_node.go +++ b/pulsar/query_node.go @@ -15,7 +15,11 @@ func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) { wg.Add(3) go qn.insert_query(qn.mc.InsertMsg, wg) go qn.delete_query(qn.mc.DeleteMsg, wg) +<<<<<<< HEAD + go qn.search_query(qn.mc.searchMsg, wg) +======= go qn.search_query(qn.mc.SearchMsg, wg) +>>>>>>> 1ab497232c9c1179499c456a250dd6e73a3259b2 wg.Wait() } diff --git a/pulsar/storage_node.go b/pulsar/storage_node.go index 3ff8178356..841eb23f80 100644 --- a/pulsar/storage_node.go +++ b/pulsar/storage_node.go @@ -2,7 +2,7 @@ package pulsar import ( "fmt" - "suvlim/pulsar/schema" + "github.com/czs007/suvlim/pulsar/schema" "sync" "time" ) @@ -11,14 +11,13 @@ type WriteNode struct { mc MessageClient } -func (wn *WriteNode)doWriteNode(wg sync.WaitGroup) { +func (wn *WriteNode) doWriteNode(wg sync.WaitGroup) { wg.Add(2) go wn.insert_write(wn.mc.InsertMsg, wg) go wn.delete_write(wn.mc.DeleteMsg, wg) wg.Wait() } - func (wn *WriteNode) PrepareBatchMsg() { wn.mc.PrepareBatchMsg(JobType(1)) } @@ -40,16 +39,12 @@ func main() { } } -func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{ +func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status { wg.Done() return schema.Status{schema.ErrorCode_SUCCESS, ""} } -func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{ +func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status { wg.Done() return schema.Status{schema.ErrorCode_SUCCESS, ""} } - - - - diff --git a/writer/test/test_writer.go b/writer/test/test_writer.go index f47a20f24f..6968e6467b 100644 --- a/writer/test/test_writer.go +++ b/writer/test/test_writer.go @@ -1,10 +1,9 @@ package main import ( - "context" + "container/list" "fmt" "github.com/czs007/suvlim/pulsar/schema" - "github.com/czs007/suvlim/writer" ) func GetInsertMsg(entityId int64) *schema.InsertMsg { @@ -25,40 +24,46 @@ func GetDeleteMsg(entityId int64) *schema.DeleteMsg { } } +//type example struct { +// id int +//} +// +//type data struct { +// buffer *list.List +//} + +//func GetExample(num int) []*example { +// var examples []*example +// i := 0 +// for i = 0; i < num; i++ { +// examples = append(examples, &example{id: i}) +// } +// return examples +//} +// +//func GetValue(data *list.List, value []int) []int { +// for e := data.Front(); e != nil; e = e.Next() { +// value = append(value, e.Value.(*example).id) +// } +// return value +//} + func main() { - ctx := context.Background() - writer, err := writer.NewWriteNode(ctx, - "collection_tag01_seg01", - 100, - "collection_tag01_seg02", - 200, - 0) - if err != nil { - fmt.Println("Can't create write node") + //ctx := context.Background() + deleteBuffer := list.New() + //insertBuffer := list.New() + deleteBuffer.PushBack(1) + deleteBuffer.PushBack(2) + var data []*list.Element + for e := deleteBuffer.Front(); e != nil; e = e.Next() { + if e.Value.(int) == 1 { + data = append(data, e) + } } - var data1 []*schema.InsertMsg - var i int64 - for i = 0; i < 100; i++ { - data1 = append(data1, GetInsertMsg(i)) - } - writer.InsertBatchData(ctx, data1, 99) - var data2 []*schema.InsertMsg - for i = 100; i < 200; i++ { - data2 = append(data2, GetInsertMsg(i)) - } - writer.InsertBatchData(ctx, data2, 199) - var deleteData []*schema.DeleteMsg - for i = 0; i < 99; i++ { - deleteData = append(deleteData, GetDeleteMsg(i)) - } - for i = 100; i < 110; i++ { - deleteData = append(deleteData, GetDeleteMsg(i)) - } - writer.DeleteBatchData(ctx, deleteData, 110) - kvMap := (*writer.KvStore).GetData(ctx) - - for k, v := range kvMap { - fmt.Println(k + ":" + string(v)) - } - + fmt.Println(data[0].Value.(int)) + //writeNode := writer.NewWriteNode( + // ctx, + // "", + // ) + //a := make(map[string]in) } diff --git a/writer/writer.go b/writer/writer.go index 73da714efc..a8952ab3e3 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,106 +1,247 @@ package writer import ( + "container/list" "context" + "fmt" + "github.com/czs007/suvlim/pulsar" "github.com/czs007/suvlim/pulsar/schema" "github.com/czs007/suvlim/writer/mock" "strconv" + "sync" ) -type writeNodeTimeSync struct { - deleteTimeSync uint64 - insertTimeSync uint64 -} - -type writeNode struct { +type CollectionMeta struct { + collionName string openSegmentId string segmentCloseTime uint64 nextSegmentId string nextSegmentCloseTime uint64 - KvStore *mock.TikvStore - timeSyncTable *writeNodeTimeSync + deleteTimeSync uint64 + insertTimeSync uint64 +} + +type WriteNode struct { + KvStore *mock.TikvStore + mc *pulsar.MessageClient + collectionMap map[string]*CollectionMeta + gtInsertMsgBuffer *list.List + gtDeleteMsgBuffer *list.List } func NewWriteNode(ctx context.Context, - openSegmentId string, - closeTime uint64, - nextSegmentId string, - nextCloseSegmentTime uint64, - timeSync uint64) (*writeNode, error) { - store, err := mock.NewTikvStore() - writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync} - if err != nil { - return nil, err + collectionName []string, + openSegmentId []string, + closeTime []uint64, + nextSegmentId []string, + nextCloseSegmentTime []uint64, + timeSync []uint64, + mc *pulsar.MessageClient) (*WriteNode, error) { + kv, err := mock.NewTikvStore() + collectionMap := make(map[string]*CollectionMeta) + for i := 0; i < len(collectionName); i++ { + collectionMap[collectionName[i]] = &CollectionMeta{ + collionName: collectionName[i], + openSegmentId: openSegmentId[i], + segmentCloseTime: closeTime[i], + nextSegmentId: nextSegmentId[i], + nextSegmentCloseTime: nextCloseSegmentTime[i], + deleteTimeSync: timeSync[i], + insertTimeSync: timeSync[i], + } } - return &writeNode{ - KvStore: store, - openSegmentId: openSegmentId, - nextSegmentId: nextSegmentId, - segmentCloseTime: closeTime, - nextSegmentCloseTime: nextCloseSegmentTime, - timeSyncTable: writeTableTimeSync, - }, nil + return &WriteNode{ + KvStore: kv, + mc: mc, + collectionMap: collectionMap, + gtInsertMsgBuffer: list.New(), + gtDeleteMsgBuffer: list.New(), + }, err } -func (s *writeNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64) error { - var i int +func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync map[string]uint64, wg sync.WaitGroup) error { var storeKey string + keyMap := make(map[string][][]byte) + binaryDataMap := make(map[string][][]byte) + timeStampMap := make(map[string][]uint64) - var keys [][]byte - var binaryData [][]byte - var timeStamps []uint64 + keyMap, binaryDataMap, timeStampMap = wn.AddInsertMsgBufferData(keyMap, binaryDataMap, timeStampMap, timeSync) - for i = 0; i < len(data); i++ { - storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) - keys = append(keys, []byte(storeKey)) - binaryData = append(binaryData, data[i].Serialization()) - timeStamps = append(timeStamps, data[i].Timestamp) + for i := 0; i < len(data); i++ { + if data[i].Timestamp <= timeSync[data[i].CollectionName] { + CollectionName := data[i].CollectionName + storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) + keyMap[CollectionName] = append(keyMap[CollectionName], []byte(storeKey)) + binaryDataMap[CollectionName] = append(binaryDataMap[CollectionName], data[i].Serialization()) + timeStampMap[CollectionName] = append(timeStampMap[CollectionName], data[i].Timestamp) + } else { + wn.gtInsertMsgBuffer.PushBack(data[i]) + } } - if s.segmentCloseTime <= timeSync { - s.openSegmentId = s.nextSegmentId - s.segmentCloseTime = s.nextSegmentCloseTime + for k, v := range wn.collectionMap { + if v.segmentCloseTime < timeSync[k] { + v.openSegmentId = v.nextSegmentId + v.segmentCloseTime = v.nextSegmentCloseTime + } } - err := (*s.KvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) - s.UpdateInsertTimeSync(timeSync) - return err -} - -func (s *writeNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64) error { - var i int - var storeKey string - - var keys [][]byte - var timeStamps []uint64 - - for i = 0; i < len(data); i++ { - storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) - keys = append(keys, []byte(storeKey)) - timeStamps = append(timeStamps, data[i].Timestamp) + for k, v := range keyMap { + err := (*wn.KvStore).PutRows(ctx, v, binaryDataMap[k], wn.collectionMap[k].openSegmentId, timeStampMap[k]) + if err != nil { + fmt.Println("Can't insert data") + } } - - segments := (*s.KvStore).GetSegment(ctx, keys) - mock.DeliverSegmentIds(keys, segments) - err := (*s.KvStore).DeleteRows(ctx, keys, timeStamps) - s.UpdateDeleteTimeSync(timeSync) - return err -} - -func (s *writeNode) AddNewSegment(segmentId string, closeSegmentTime uint64) error { - s.nextSegmentId = segmentId - s.nextSegmentCloseTime = closeSegmentTime + wn.UpdateInsertTimeSync(timeSync) + wg.Done() return nil } -func (s *writeNode) UpdateInsertTimeSync(timeSync uint64) { - s.timeSyncTable.insertTimeSync = timeSync +func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSyncMap map[string]uint64, wg sync.WaitGroup) error { + var storeKey string + keyMap := make(map[string][][]byte) + timeStampMap := make(map[string][]uint64) + + keyMap, timeStampMap = wn.AddDeleteMsgBufferData(keyMap, timeStampMap, timeSyncMap) + + for i := 0; i < len(data); i++ { + if data[i].Timestamp <= timeSyncMap[data[i].CollectionName] { + CollectionName := data[i].CollectionName + storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) + keyMap[CollectionName] = append(keyMap[CollectionName], []byte(storeKey)) + timeStampMap[CollectionName] = append(timeStampMap[CollectionName], data[i].Timestamp) + } else { + wn.gtDeleteMsgBuffer.PushBack(data[i]) + } + } + + for k, v := range wn.collectionMap { + if v.segmentCloseTime < timeSyncMap[k] { + v.openSegmentId = v.nextSegmentId + v.segmentCloseTime = v.nextSegmentCloseTime + } + } + + for k, v := range keyMap { + err := (*wn.KvStore).DeleteRows(ctx, v, timeStampMap[k]) + if err != nil { + fmt.Println("Can't insert data") + } + } + wn.UpdateDeleteTimeSync(timeSyncMap) + wg.Done() + return nil } -func (s *writeNode) UpdateDeleteTimeSync(timeSync uint64) { - s.timeSyncTable.deleteTimeSync = timeSync +func (wn *WriteNode) AddNextSegment(collectionName string, segmentId string, closeSegmentTime uint64) { + wn.collectionMap[collectionName].nextSegmentId = segmentId + wn.collectionMap[collectionName].nextSegmentCloseTime = closeSegmentTime } -func (s *writeNode) UpdateCloseTime(closeTime uint64) { - s.segmentCloseTime = closeTime +func (wn *WriteNode) UpdateInsertTimeSync(timeSyncMap map[string]uint64) { + for k, v := range wn.collectionMap { + v.insertTimeSync = timeSyncMap[k] + } +} + +func (wn *WriteNode) UpdateDeleteTimeSync(timeSyncMap map[string]uint64) { + for k, v := range wn.collectionMap { + v.deleteTimeSync = timeSyncMap[k] + } +} + +func (wn *WriteNode) UpdateCloseTime(collectionName string, closeTime uint64) { + wn.collectionMap[collectionName].segmentCloseTime = closeTime +} + +func (wn *WriteNode) AddInsertMsgBufferData(keyMap map[string][][]byte, + dataMap map[string][][]byte, + timeStampMap map[string][]uint64, + timeSyncMap map[string]uint64) (map[string][][]byte, map[string][][]byte, map[string][]uint64) { + var storeKey string + var selectElement []*list.Element + for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() { + collectionName := e.Value.(*schema.InsertMsg).CollectionName + if e.Value.(*schema.InsertMsg).Timestamp <= timeSyncMap[collectionName] { + storeKey = collectionName + + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10) + keyMap[collectionName] = append(keyMap[collectionName], []byte(storeKey)) + dataMap[collectionName] = append(dataMap[collectionName], e.Value.(*schema.InsertMsg).Serialization()) + timeStampMap[collectionName] = append(timeStampMap[collectionName], e.Value.(*schema.InsertMsg).Timestamp) + selectElement = append(selectElement, e) + } + } + for i := 0; i < len(selectElement); i++ { + wn.gtInsertMsgBuffer.Remove(selectElement[i]) + } + return keyMap, dataMap, timeStampMap +} + +func (wn *WriteNode) AddDeleteMsgBufferData(keyMap map[string][][]byte, + timeStampMap map[string][]uint64, + timeSyncMap map[string]uint64) (map[string][][]byte, map[string][]uint64) { + var storeKey string + var selectElement []*list.Element + for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() { + collectionName := e.Value.(*schema.InsertMsg).CollectionName + if e.Value.(*schema.InsertMsg).Timestamp <= timeSyncMap[collectionName] { + storeKey = collectionName + + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10) + keyMap[collectionName] = append(keyMap[collectionName], []byte(storeKey)) + timeStampMap[collectionName] = append(timeStampMap[collectionName], e.Value.(*schema.InsertMsg).Timestamp) + selectElement = append(selectElement, e) + } + } + for i := 0; i < len(selectElement); i++ { + wn.gtDeleteMsgBuffer.Remove(selectElement[i]) + } + return keyMap, timeStampMap +} + +func (wn *WriteNode) AddCollection(collectionName string, + openSegmentId string, + closeTime uint64, + nextSegmentId string, + nextSegmentCloseTime uint64, + timeSync uint64) { + wn.collectionMap[collectionName] = &CollectionMeta{ + collionName: collectionName, + openSegmentId: openSegmentId, + segmentCloseTime: closeTime, + nextSegmentId: nextSegmentId, + nextSegmentCloseTime: nextSegmentCloseTime, + deleteTimeSync: timeSync, + insertTimeSync: timeSync, + } +} + +func (wn *WriteNode) DeleteCollection(collectionName string) { + delete(wn.collectionMap, collectionName) + var deleteMsg []*list.Element + var insertMsg []*list.Element + for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() { + if e.Value.(*schema.InsertMsg).CollectionName == collectionName { + insertMsg = append(insertMsg, e) + } + } + for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() { + if e.Value.(*schema.DeleteMsg).CollectionName == collectionName { + deleteMsg = append(deleteMsg, e) + } + } + for i := 0; i < len(insertMsg); i++ { + wn.gtInsertMsgBuffer.Remove(insertMsg[i]) + } + for i := 0; i < len(deleteMsg); i++ { + wn.gtDeleteMsgBuffer.Remove(deleteMsg[i]) + } +} + +func (wn *WriteNode) doWriteNode(ctx context.Context, wg sync.WaitGroup) { + deleteTimeSync := make(map[string]uint64) + insertTimeSync := make(map[string]uint64) + wg.Add(2) + go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg) + go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg) + wg.Wait() }