diff --git a/writer/mock/mock.go b/writer/mock/mock.go new file mode 100644 index 0000000000..57306f5d60 --- /dev/null +++ b/writer/mock/mock.go @@ -0,0 +1,61 @@ +package mock + +import ( + "context" +) + +type Key = []byte +type Value = []byte +type Timestamp = uint64 +type DriverType string + +type TikvStore struct { + kvMap map[string][]byte + segmentMap map[string]string +} + +func NewTikvStore() (*TikvStore, error) { + var map1 map[string][]byte + map1 = make(map[string][]byte) + var segment map[string]string + segment = make(map[string]string) + return &TikvStore{ + kvMap: map1, + segmentMap: segment, + }, nil +} + +func (s *TikvStore) PutRows(ctx context.Context, keys [][]byte, values [][]byte, segment string, timestamp []Timestamp) error { + var i int + for i = 0; i < len(keys); i++ { + s.kvMap[string(keys[i])] = values[i] + s.segmentMap[string(keys[i])] = segment + } + return nil +} + +func (s *TikvStore) DeleteRows(ctx context.Context, keys [][]byte, timestamps []Timestamp) error { + var i int + for i = 0; i < len(keys); i++ { + delete(s.kvMap, string(keys[i])) + delete(s.segmentMap, string(keys[i])) + } + return nil +} + +func (s *TikvStore) GetSegment(ctx context.Context, keys [][]byte) []string { + var segmentId []string + var i int + for i = 0; i < len(keys); i++ { + segmentId = append(segmentId, s.segmentMap[string(keys[i])]) + } + return segmentId +} + +func (s *TikvStore) GetData(ctx context.Context) map[string][]byte { + return s.kvMap +} + +func DeliverSegmentIds(keys [][]byte, segmentIds []string) { + +} diff --git a/writer/test/test_writer.go b/writer/test/test_writer.go new file mode 100644 index 0000000000..f47a20f24f --- /dev/null +++ b/writer/test/test_writer.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "fmt" + "github.com/czs007/suvlim/pulsar/schema" + "github.com/czs007/suvlim/writer" +) + +func GetInsertMsg(entityId int64) *schema.InsertMsg { + return &schema.InsertMsg{ + CollectionName: "collection", + PartitionTag: "tag01", + EntityId: entityId, + Timestamp: uint64(entityId), + ClientId: 0, + } +} + +func GetDeleteMsg(entityId int64) *schema.DeleteMsg { + return &schema.DeleteMsg{ + CollectionName: "collection", + EntityId: entityId, + Timestamp: uint64(entityId + 100), + } +} + +func main() { + ctx := context.Background() + writer, err := writer.NewWriteNode(ctx, + "collection_tag01_seg01", + 100, + "collection_tag01_seg02", + 200, + 0) + if err != nil { + fmt.Println("Can't create write node") + } + var data1 []*schema.InsertMsg + var i int64 + for i = 0; i < 100; i++ { + data1 = append(data1, GetInsertMsg(i)) + } + writer.InsertBatchData(ctx, data1, 99) + var data2 []*schema.InsertMsg + for i = 100; i < 200; i++ { + data2 = append(data2, GetInsertMsg(i)) + } + writer.InsertBatchData(ctx, data2, 199) + var deleteData []*schema.DeleteMsg + for i = 0; i < 99; i++ { + deleteData = append(deleteData, GetDeleteMsg(i)) + } + for i = 100; i < 110; i++ { + deleteData = append(deleteData, GetDeleteMsg(i)) + } + writer.DeleteBatchData(ctx, deleteData, 110) + kvMap := (*writer.KvStore).GetData(ctx) + + for k, v := range kvMap { + fmt.Println(k + ":" + string(v)) + } + +} diff --git a/writer/writer.go b/writer/writer.go index f109585857..73da714efc 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -3,8 +3,7 @@ package writer import ( "context" "github.com/czs007/suvlim/pulsar/schema" - "github.com/czs007/suvlim/storage/pkg" - "github.com/czs007/suvlim/storage/pkg/types" + "github.com/czs007/suvlim/writer/mock" "strconv" ) @@ -18,7 +17,7 @@ type writeNode struct { segmentCloseTime uint64 nextSegmentId string nextSegmentCloseTime uint64 - kvStore *types.Store + KvStore *mock.TikvStore timeSyncTable *writeNodeTimeSync } @@ -28,14 +27,13 @@ func NewWriteNode(ctx context.Context, nextSegmentId string, nextCloseSegmentTime uint64, timeSync uint64) (*writeNode, error) { - ctx = context.Background() - store, err := storage.NewStore(ctx, "TIKV") + store, err := mock.NewTikvStore() writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync} if err != nil { return nil, err } return &writeNode{ - kvStore: store, + KvStore: store, openSegmentId: openSegmentId, nextSegmentId: nextSegmentId, segmentCloseTime: closeTime, @@ -44,7 +42,7 @@ func NewWriteNode(ctx context.Context, }, nil } -func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, timeSync uint64) error { +func (s *writeNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64) error { var i int var storeKey string @@ -64,12 +62,12 @@ func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg s.segmentCloseTime = s.nextSegmentCloseTime } - err := (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) + err := (*s.KvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) s.UpdateInsertTimeSync(timeSync) return err } -func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, timeSync uint64) error { +func (s *writeNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64) error { var i int var storeKey string @@ -82,9 +80,9 @@ func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg timeStamps = append(timeStamps, data[i].Timestamp) } - //TODO:Get segment id for delete data and deliver those message to specify topic - - err := (*s.kvStore).DeleteRows(ctx, keys, timeStamps) + segments := (*s.KvStore).GetSegment(ctx, keys) + mock.DeliverSegmentIds(keys, segments) + err := (*s.KvStore).DeleteRows(ctx, keys, timeStamps) s.UpdateDeleteTimeSync(timeSync) return err }