mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Add mock unittest
Signed-off-by: become-nice <995581097@qq.com>
This commit is contained in:
parent
08ee90e1de
commit
f1d6b62e1d
61
writer/mock/mock.go
Normal file
61
writer/mock/mock.go
Normal file
@ -0,0 +1,61 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Key = []byte
|
||||
type Value = []byte
|
||||
type Timestamp = uint64
|
||||
type DriverType string
|
||||
|
||||
type TikvStore struct {
|
||||
kvMap map[string][]byte
|
||||
segmentMap map[string]string
|
||||
}
|
||||
|
||||
func NewTikvStore() (*TikvStore, error) {
|
||||
var map1 map[string][]byte
|
||||
map1 = make(map[string][]byte)
|
||||
var segment map[string]string
|
||||
segment = make(map[string]string)
|
||||
return &TikvStore{
|
||||
kvMap: map1,
|
||||
segmentMap: segment,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *TikvStore) PutRows(ctx context.Context, keys [][]byte, values [][]byte, segment string, timestamp []Timestamp) error {
|
||||
var i int
|
||||
for i = 0; i < len(keys); i++ {
|
||||
s.kvMap[string(keys[i])] = values[i]
|
||||
s.segmentMap[string(keys[i])] = segment
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TikvStore) DeleteRows(ctx context.Context, keys [][]byte, timestamps []Timestamp) error {
|
||||
var i int
|
||||
for i = 0; i < len(keys); i++ {
|
||||
delete(s.kvMap, string(keys[i]))
|
||||
delete(s.segmentMap, string(keys[i]))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TikvStore) GetSegment(ctx context.Context, keys [][]byte) []string {
|
||||
var segmentId []string
|
||||
var i int
|
||||
for i = 0; i < len(keys); i++ {
|
||||
segmentId = append(segmentId, s.segmentMap[string(keys[i])])
|
||||
}
|
||||
return segmentId
|
||||
}
|
||||
|
||||
func (s *TikvStore) GetData(ctx context.Context) map[string][]byte {
|
||||
return s.kvMap
|
||||
}
|
||||
|
||||
func DeliverSegmentIds(keys [][]byte, segmentIds []string) {
|
||||
|
||||
}
|
||||
64
writer/test/test_writer.go
Normal file
64
writer/test/test_writer.go
Normal file
@ -0,0 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/czs007/suvlim/pulsar/schema"
|
||||
"github.com/czs007/suvlim/writer"
|
||||
)
|
||||
|
||||
func GetInsertMsg(entityId int64) *schema.InsertMsg {
|
||||
return &schema.InsertMsg{
|
||||
CollectionName: "collection",
|
||||
PartitionTag: "tag01",
|
||||
EntityId: entityId,
|
||||
Timestamp: uint64(entityId),
|
||||
ClientId: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func GetDeleteMsg(entityId int64) *schema.DeleteMsg {
|
||||
return &schema.DeleteMsg{
|
||||
CollectionName: "collection",
|
||||
EntityId: entityId,
|
||||
Timestamp: uint64(entityId + 100),
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
writer, err := writer.NewWriteNode(ctx,
|
||||
"collection_tag01_seg01",
|
||||
100,
|
||||
"collection_tag01_seg02",
|
||||
200,
|
||||
0)
|
||||
if err != nil {
|
||||
fmt.Println("Can't create write node")
|
||||
}
|
||||
var data1 []*schema.InsertMsg
|
||||
var i int64
|
||||
for i = 0; i < 100; i++ {
|
||||
data1 = append(data1, GetInsertMsg(i))
|
||||
}
|
||||
writer.InsertBatchData(ctx, data1, 99)
|
||||
var data2 []*schema.InsertMsg
|
||||
for i = 100; i < 200; i++ {
|
||||
data2 = append(data2, GetInsertMsg(i))
|
||||
}
|
||||
writer.InsertBatchData(ctx, data2, 199)
|
||||
var deleteData []*schema.DeleteMsg
|
||||
for i = 0; i < 99; i++ {
|
||||
deleteData = append(deleteData, GetDeleteMsg(i))
|
||||
}
|
||||
for i = 100; i < 110; i++ {
|
||||
deleteData = append(deleteData, GetDeleteMsg(i))
|
||||
}
|
||||
writer.DeleteBatchData(ctx, deleteData, 110)
|
||||
kvMap := (*writer.KvStore).GetData(ctx)
|
||||
|
||||
for k, v := range kvMap {
|
||||
fmt.Println(k + ":" + string(v))
|
||||
}
|
||||
|
||||
}
|
||||
@ -3,8 +3,7 @@ package writer
|
||||
import (
|
||||
"context"
|
||||
"github.com/czs007/suvlim/pulsar/schema"
|
||||
"github.com/czs007/suvlim/storage/pkg"
|
||||
"github.com/czs007/suvlim/storage/pkg/types"
|
||||
"github.com/czs007/suvlim/writer/mock"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
@ -18,7 +17,7 @@ type writeNode struct {
|
||||
segmentCloseTime uint64
|
||||
nextSegmentId string
|
||||
nextSegmentCloseTime uint64
|
||||
kvStore *types.Store
|
||||
KvStore *mock.TikvStore
|
||||
timeSyncTable *writeNodeTimeSync
|
||||
}
|
||||
|
||||
@ -28,14 +27,13 @@ func NewWriteNode(ctx context.Context,
|
||||
nextSegmentId string,
|
||||
nextCloseSegmentTime uint64,
|
||||
timeSync uint64) (*writeNode, error) {
|
||||
ctx = context.Background()
|
||||
store, err := storage.NewStore(ctx, "TIKV")
|
||||
store, err := mock.NewTikvStore()
|
||||
writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &writeNode{
|
||||
kvStore: store,
|
||||
KvStore: store,
|
||||
openSegmentId: openSegmentId,
|
||||
nextSegmentId: nextSegmentId,
|
||||
segmentCloseTime: closeTime,
|
||||
@ -44,7 +42,7 @@ func NewWriteNode(ctx context.Context,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, timeSync uint64) error {
|
||||
func (s *writeNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64) error {
|
||||
var i int
|
||||
var storeKey string
|
||||
|
||||
@ -64,12 +62,12 @@ func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg
|
||||
s.segmentCloseTime = s.nextSegmentCloseTime
|
||||
}
|
||||
|
||||
err := (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps)
|
||||
err := (*s.KvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps)
|
||||
s.UpdateInsertTimeSync(timeSync)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, timeSync uint64) error {
|
||||
func (s *writeNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64) error {
|
||||
var i int
|
||||
var storeKey string
|
||||
|
||||
@ -82,9 +80,9 @@ func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg
|
||||
timeStamps = append(timeStamps, data[i].Timestamp)
|
||||
}
|
||||
|
||||
//TODO:Get segment id for delete data and deliver those message to specify topic
|
||||
|
||||
err := (*s.kvStore).DeleteRows(ctx, keys, timeStamps)
|
||||
segments := (*s.KvStore).GetSegment(ctx, keys)
|
||||
mock.DeliverSegmentIds(keys, segments)
|
||||
err := (*s.KvStore).DeleteRows(ctx, keys, timeStamps)
|
||||
s.UpdateDeleteTimeSync(timeSync)
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user