From 558a81dcbe0f0bafd0a751888973455af5c8151d Mon Sep 17 00:00:00 2001 From: become-nice <995581097@qq.com> Date: Tue, 1 Sep 2020 18:11:43 +0800 Subject: [PATCH] Update the interface of writenode Signed-off-by: become-nice <995581097@qq.com> --- go.mod | 4 +- master/kv/kv.go | 2 +- writer/writer.go | 134 ++++++++--------------------------------------- 3 files changed, 26 insertions(+), 114 deletions(-) diff --git a/go.mod b/go.mod index 9a713220d8..4ab3c7f8e7 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/tikv/client-go v0.0.0-20200824032810-95774393107b github.com/tikv/pd v2.1.19+incompatible - go.etcd.io/etcd v3.3.25+incompatible + go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.uber.org/zap v1.15.0 golang.org/x/net v0.0.0-20200822124328-c89045814202 google.golang.org/grpc v1.31.1 @@ -30,3 +30,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 sigs.k8s.io/yaml v1.2.0 // indirect ) + +replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 diff --git a/master/kv/kv.go b/master/kv/kv.go index 967f5f6bbe..b967b67110 100644 --- a/master/kv/kv.go +++ b/master/kv/kv.go @@ -15,7 +15,7 @@ package kv // Base is an abstract interface for load/save pd cluster data. type Base interface { - Load(key string) (string, error) + Load(key string) (string, error) Save(key, value string) error Remove(key string) error } diff --git a/writer/writer.go b/writer/writer.go index 5fc73090f6..d77c404115 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,7 +1,6 @@ package writer import ( - "container/list" "context" "fmt" "github.com/czs007/suvlim/pulsar" @@ -11,29 +10,11 @@ import ( "sync" ) -//type PartitionMeta struct { -// collectionName string -// partitionName string -// openSegmentId string -// segmentCloseTime uint64 -// nextSegmentId string -// nextSegmentCloseTime uint64 -//} -// -//type CollectionMeta struct { -// collionName string -// partitionMetaMap map[string]*PartitionMeta -// deleteTimeSync uint64 -// insertTimeSync uint64 -//} - type WriteNode struct { - KvStore *mock.TikvStore - mc *pulsar.MessageClient - gtInsertMsgBuffer *list.List - gtDeleteMsgBuffer *list.List - deleteTimeSync uint64 - insertTimeSync uint64 + KvStore *mock.TikvStore + mc *pulsar.MessageClient + deleteTimeSync uint64 + insertTimeSync uint64 } func NewWriteNode(ctx context.Context, @@ -43,12 +24,10 @@ func NewWriteNode(ctx context.Context, kv, err := mock.NewTikvStore() mc := &pulsar.MessageClient{} return &WriteNode{ - KvStore: kv, - mc: mc, - gtInsertMsgBuffer: list.New(), - gtDeleteMsgBuffer: list.New(), - insertTimeSync: timeSync, - deleteTimeSync: timeSync, + KvStore: kv, + mc: mc, + insertTimeSync: timeSync, + deleteTimeSync: timeSync, }, err } @@ -60,19 +39,13 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM var binaryData [][]byte var timeStamp []uint64 - wn.AddInsertMsgBufferData(&prefixKeys, &suffixKeys, &binaryData, &timeStamp, timeSync) - for i := 0; i < len(data); i++ { - if data[i].Timestamp <= timeSync { - prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) - suffixKey = data[i].PartitionTag + "_" + strconv.FormatUint(data[i].SegmentId, 10) - prefixKeys = append(prefixKeys, []byte(prefixKey)) - suffixKeys = append(suffixKeys, []byte(suffixKey)) - binaryData = append(binaryData, data[i].Serialization()) - timeStamp = append(timeStamp, data[i].Timestamp) - } else { - wn.gtInsertMsgBuffer.PushBack(data[i]) - } + prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) + suffixKey = data[i].PartitionTag + "_" + strconv.FormatUint(data[i].SegmentId, 10) + prefixKeys = append(prefixKeys, []byte(prefixKey)) + suffixKeys = append(suffixKeys, []byte(suffixKey)) + binaryData = append(binaryData, data[i].Serialization()) + timeStamp = append(timeStamp, data[i].Timestamp) } (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData) @@ -86,16 +59,10 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM var prefixKeys [][]byte var timeStamps []uint64 - wn.AddDeleteMsgBufferData(&prefixKeys, &timeStamps, timeSync) - for i := 0; i < len(data); i++ { - if data[i].Timestamp <= timeSync { - prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) + "_" - prefixKeys = append(prefixKeys, []byte(prefixKey)) - timeStamps = append(timeStamps, data[i].Timestamp) - } else { - wn.gtDeleteMsgBuffer.PushBack(data[i]) - } + prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) + "_" + prefixKeys = append(prefixKeys, []byte(prefixKey)) + timeStamps = append(timeStamps, data[i].Timestamp) } err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps) @@ -115,66 +82,9 @@ func (wn *WriteNode) UpdateDeleteTimeSync(timeSync uint64) { wn.deleteTimeSync = timeSync } -func (wn *WriteNode) AddInsertMsgBufferData( - prefixKeys *[][]byte, - suffixKeys *[][]byte, - data *[][]byte, - timeStamp *[]uint64, - timeSync uint64) { - var prefixKey string - var suffixKey string - var selectElement []*list.Element - for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() { - collectionName := e.Value.(*schema.InsertMsg).CollectionName - partitionTag := e.Value.(*schema.InsertMsg).PartitionTag - segmentId := e.Value.(*schema.InsertMsg).SegmentId - if e.Value.(*schema.InsertMsg).Timestamp <= timeSync { - prefixKey = collectionName + "_" + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10) - suffixKey = partitionTag + "_" + strconv.FormatUint(segmentId, 10) - *prefixKeys = append(*prefixKeys, []byte(prefixKey)) - *suffixKeys = append(*suffixKeys, []byte(suffixKey)) - *data = append(*data, e.Value.(*schema.InsertMsg).Serialization()) - *timeStamp = append(*timeStamp, e.Value.(*schema.InsertMsg).Timestamp) - selectElement = append(selectElement, e) - } - } - for i := 0; i < len(selectElement); i++ { - wn.gtInsertMsgBuffer.Remove(selectElement[i]) - } -} - -func (wn *WriteNode) AddDeleteMsgBufferData(prefixKeys *[][]byte, - timeStamps *[]uint64, - timeSync uint64) { - var prefixKey string - var selectElement []*list.Element - for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() { - collectionName := e.Value.(*schema.InsertMsg).CollectionName - if e.Value.(*schema.DeleteMsg).Timestamp <= timeSync { - prefixKey = collectionName + "_" + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10) + "_" - *prefixKeys = append(*prefixKeys, []byte(prefixKey)) - *timeStamps = append(*timeStamps, e.Value.(*schema.DeleteMsg).Timestamp) - selectElement = append(selectElement, e) - } - } - for i := 0; i < len(selectElement); i++ { - wn.gtDeleteMsgBuffer.Remove(selectElement[i]) - } -} - -func (wn *WriteNode) GetInsertBuffer() *list.List { - return wn.gtInsertMsgBuffer -} - -func (wn *WriteNode) GetDeleteBuffer() *list.List { - return wn.gtDeleteMsgBuffer -} - -func (wn *WriteNode) doWriteNode(ctx context.Context, wg sync.WaitGroup) { - //deleteTimeSync := make(map[string]uint64) - //insertTimeSync := make(map[string]uint64) - //wg.Add(2) - //go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg) - //go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg) - //wg.Wait() +func (wn *WriteNode) doWriteNode(ctx context.Context, deleteTimeSync uint64, insertTimeSync uint64, wg sync.WaitGroup) { + wg.Add(2) + go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg) + go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg) + wg.Wait() }