From 90cd86de17b13e5a55086987ef4db5924d8bc26a Mon Sep 17 00:00:00 2001 From: rain Date: Fri, 25 Sep 2020 10:23:48 +0800 Subject: [PATCH] Update controller and fix kvbase bug Signed-off-by: rain --- pkg/master/collection/collection.go | 100 +++++++ pkg/master/collection/collection_test.go | 39 +++ pkg/master/common/config.go | 10 - pkg/master/controller/collection.go | 117 +++++++++ pkg/master/controller/segment.go | 114 ++++++++ pkg/master/controller/segment_test.go | 37 +++ pkg/master/grpc/server.go | 70 +++++ pkg/master/informer/pulsar.go | 33 +-- pkg/master/kv/etcd_kv.go | 13 +- pkg/master/mock/fake/maing.go | 51 ++++ pkg/master/mock/pulsar.go | 7 +- pkg/master/mock/segment_test.go | 45 +++- pkg/master/segment/segment.go | 54 ++++ pkg/master/segment/stats.go | 69 +++++ pkg/master/server.go | 317 ++--------------------- 15 files changed, 723 insertions(+), 353 deletions(-) create mode 100644 pkg/master/collection/collection.go create mode 100644 pkg/master/collection/collection_test.go delete mode 100644 pkg/master/common/config.go create mode 100644 pkg/master/controller/collection.go create mode 100644 pkg/master/controller/segment.go create mode 100644 pkg/master/controller/segment_test.go create mode 100644 pkg/master/grpc/server.go create mode 100644 pkg/master/mock/fake/maing.go create mode 100644 pkg/master/segment/segment.go create mode 100644 pkg/master/segment/stats.go diff --git a/pkg/master/collection/collection.go b/pkg/master/collection/collection.go new file mode 100644 index 0000000000..11e880c49f --- /dev/null +++ b/pkg/master/collection/collection.go @@ -0,0 +1,100 @@ +package collection + +import ( + "time" + + masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" + messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/gogo/protobuf/proto" + jsoniter "github.com/json-iterator/go" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +type Collection struct { + ID uint64 `json:"id"` + Name string `json:"name"` + CreateTime uint64 `json:"creat_time"` + Schema []FieldMeta `json:"schema"` + // ExtraSchema []FieldMeta `json:"extra_schema"` + SegmentIDs []uint64 `json:"segment_ids"` + PartitionTags []string `json:"partition_tags"` + GrpcMarshalString string `json:"grpc_marshal_string"` + IndexParam []*messagepb.IndexParam `json:"index_param"` +} + +type FieldMeta struct { + FieldName string `json:"field_name"` + Type messagepb.DataType `json:"type"` + DIM int64 `json:"dimension"` +} + +func GrpcMarshal(c *Collection) *Collection { + if c.GrpcMarshalString != "" { + c.GrpcMarshalString = "" + } + pbSchema := &messagepb.Schema{ + FieldMetas: []*messagepb.FieldMeta{}, + } + schemaSlice := []*messagepb.FieldMeta{} + for _, v := range c.Schema { + newpbMeta := &messagepb.FieldMeta{ + FieldName: v.FieldName, + Type: v.Type, + Dim: v.DIM, + } + schemaSlice = append(schemaSlice, newpbMeta) + } + pbSchema.FieldMetas = schemaSlice + grpcCollection := &masterpb.Collection{ + Id: c.ID, + Name: c.Name, + Schema: pbSchema, + CreateTime: c.CreateTime, + SegmentIds: c.SegmentIDs, + PartitionTags: c.PartitionTags, + Indexes: c.IndexParam, + } + out := proto.MarshalTextString(grpcCollection) + c.GrpcMarshalString = out + return c +} + +func NewCollection(id uint64, name string, createTime time.Time, + schema []*messagepb.FieldMeta, sIds []uint64, ptags []string) Collection { + + segementIDs := []uint64{} + newSchema := []FieldMeta{} + for _, v := range schema { + newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type, DIM: v.Dim}) + } + for _, sid := range sIds { + segementIDs = append(segementIDs, sid) + } + return Collection{ + ID: id, + Name: name, + CreateTime: uint64(createTime.Unix()), + Schema: newSchema, + SegmentIDs: segementIDs, + PartitionTags: ptags, + } +} + +func Collection2JSON(c Collection) (string, error) { + b, err := json.Marshal(&c) + if err != nil { + return "", err + } + + return string(b), nil +} + +func JSON2Collection(s string) (*Collection, error) { + var c Collection + err := json.Unmarshal([]byte(s), &c) + if err != nil { + return &Collection{}, err + } + return &c, nil +} diff --git a/pkg/master/collection/collection_test.go b/pkg/master/collection/collection_test.go new file mode 100644 index 0000000000..256ead7fa0 --- /dev/null +++ b/pkg/master/collection/collection_test.go @@ -0,0 +1,39 @@ +package collection + +import ( + "testing" + "time" + + messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/stretchr/testify/assert" +) + +var ( + cid = uint64(10011111234) + name = "test-segment" + createTime = time.Now() + schema = []*messagepb.FieldMeta{} + sIds = []uint64{111111, 222222} + ptags = []string{"default", "test"} +) + +func TestNewCollection(t *testing.T) { + assert := assert.New(t) + c := NewCollection(cid, name, createTime, schema, sIds, ptags) + assert.Equal(cid, c.ID) + assert.Equal(name, c.Name) + for k, v := range schema { + assert.Equal(v.Dim, c.Schema[k].DIM) + assert.Equal(v.FieldName, c.Schema[k].FieldName) + assert.Equal(v.Type, c.Schema[k].Type) + } + assert.Equal(sIds, c.SegmentIDs) + assert.Equal(ptags, c.PartitionTags) +} + +func TestGrpcMarshal(t *testing.T) { + assert := assert.New(t) + c := NewCollection(cid, name, createTime, schema, sIds, ptags) + newc := GrpcMarshal(&c) + assert.NotEqual("", newc.GrpcMarshalString) +} diff --git a/pkg/master/common/config.go b/pkg/master/common/config.go deleted file mode 100644 index 844b5b6e8d..0000000000 --- a/pkg/master/common/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package common - -//const ( -// PULSAR_URL = "pulsar://localhost:6650" -// PULSAR_MONITER_INTERVAL = 1 * time.Second -// PULSAR_TOPIC = "monitor-topic" -// ETCD_ROOT_PATH = "by-dev" -// SEGMENT_THRESHOLE = 10000 -// DEFAULT_GRPC_PORT = ":53100" -//) diff --git a/pkg/master/controller/collection.go b/pkg/master/controller/collection.go new file mode 100644 index 0000000000..6ff5d2345a --- /dev/null +++ b/pkg/master/controller/collection.go @@ -0,0 +1,117 @@ +package controller + +import ( + "log" + "strconv" + "time" + + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/collection" + messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/id" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/pkg/master/segment" +) + +func CollectionController(ch chan *messagepb.Mapping, kvbase kv.Base, errch chan error) { + for collectionMeta := range ch { + sID := id.New().Uint64() + cID := id.New().Uint64() + s2ID := id.New().Uint64() + fieldMetas := []*messagepb.FieldMeta{} + if collectionMeta.Schema != nil { + fieldMetas = collectionMeta.Schema.FieldMetas + } + c := collection.NewCollection(cID, collectionMeta.CollectionName, + time.Now(), fieldMetas, []uint64{sID, s2ID}, + []string{"default"}) + cm := collection.GrpcMarshal(&c) + s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) + s2 := segment.NewSegment(s2ID, cID, collectionMeta.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) + collectionData, _ := collection.Collection2JSON(*cm) + segmentData, err := segment.Segment2JSON(s) + if err != nil { + log.Fatal(err) + } + s2Data, err := segment.Segment2JSON(s2) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data) + if err != nil { + log.Fatal(err) + } + } +} + +func WriteCollection2Datastore(collectionMeta *messagepb.Mapping, kvbase kv.Base) error { + sID := id.New().Uint64() + cID := id.New().Uint64() + fieldMetas := []*messagepb.FieldMeta{} + if collectionMeta.Schema != nil { + fieldMetas = collectionMeta.Schema.FieldMetas + } + c := collection.NewCollection(cID, collectionMeta.CollectionName, + time.Now(), fieldMetas, []uint64{sID}, + []string{"default"}) + cm := collection.GrpcMarshal(&c) + s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) + collectionData, err := collection.Collection2JSON(*cm) + if err != nil { + log.Fatal(err) + return err + } + segmentData, err := segment.Segment2JSON(s) + if err != nil { + log.Fatal(err) + return err + } + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) + if err != nil { + log.Fatal(err) + return err + } + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) + if err != nil { + log.Fatal(err) + return err + } + return nil + +} + +func UpdateCollectionIndex(index *messagepb.IndexParam, kvbase kv.Base) error { + collectionName := index.CollectionName + collectionJSON, err := kvbase.Load("collection/" + collectionName) + if err != nil { + return err + } + c, err := collection.JSON2Collection(collectionJSON) + if err != nil { + return err + } + for k, v := range c.IndexParam { + if v.IndexName == index.IndexName { + c.IndexParam[k] = v + } + } + c.IndexParam = append(c.IndexParam, index) + cm := collection.GrpcMarshal(c) + collectionData, err := collection.Collection2JSON(*cm) + if err != nil { + return err + } + err = kvbase.Save("collection/"+collectionName, collectionData) + if err != nil { + return err + } + return nil +} diff --git a/pkg/master/controller/segment.go b/pkg/master/controller/segment.go new file mode 100644 index 0000000000..5ddf12f45f --- /dev/null +++ b/pkg/master/controller/segment.go @@ -0,0 +1,114 @@ +package controller + +import ( + "fmt" + "strconv" + "time" + + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/collection" + "github.com/czs007/suvlim/pkg/master/id" + "github.com/czs007/suvlim/pkg/master/informer" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/pkg/master/segment" +) + +func SegmentStatsController(kvbase kv.Base, errch chan error) { + ssChan := make(chan segment.SegmentStats, 10) + ssClient := informer.NewPulsarClient() + go segment.Listener(ssChan, ssClient) + for { + select { + case ss := <-ssChan: + errch <- ComputeCloseTime(ss, kvbase) + errch <- UpdateSegmentStatus(ss, kvbase) + case <-time.After(5 * time.Second): + fmt.Println("wait for new request") + return + } + } + +} + +func ComputeCloseTime(ss segment.SegmentStats, kvbase kv.Base) error { + if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { + currentTime := time.Now() + memRate := int(ss.MemoryRate) + if memRate == 0 { + memRate = 1 + } + sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate + data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) + if err != nil { + return err + } + seg, err := segment.JSON2Segment(data) + if err != nil { + return err + } + seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) + fmt.Println(seg) + updateData, err := segment.Segment2JSON(*seg) + if err != nil { + return err + } + kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) + //create new segment + newSegID := id.New().Uint64() + newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) + newSegData, err := segment.Segment2JSON(*&newSeg) + if err != nil { + return err + } + //save to kv store + kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) + // update collection data + c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) + collectionMeta, err := collection.JSON2Collection(c) + if err != nil { + return err + } + segIDs := collectionMeta.SegmentIDs + segIDs = append(segIDs, newSegID) + collectionMeta.SegmentIDs = segIDs + cData, err := collection.Collection2JSON(*collectionMeta) + if err != nil { + return err + } + kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) + } + return nil +} + +func UpdateSegmentStatus(ss segment.SegmentStats, kvbase kv.Base) error { + segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) + if err != nil { + return err + } + seg, err := segment.JSON2Segment(segmentData) + if err != nil { + return err + } + var changed bool + changed = false + if seg.Status != ss.Status { + changed = true + seg.Status = ss.Status + } + if seg.Rows != ss.Rows { + changed = true + seg.Rows = ss.Rows + } + + if changed { + segData, err := segment.Segment2JSON(*seg) + if err != nil { + return err + } + err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/master/controller/segment_test.go b/pkg/master/controller/segment_test.go new file mode 100644 index 0000000000..76aef7c421 --- /dev/null +++ b/pkg/master/controller/segment_test.go @@ -0,0 +1,37 @@ +package controller + +import ( + "strconv" + "testing" + "time" + + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/pkg/master/segment" + "go.etcd.io/etcd/clientv3" +) + +func newKvBase() kv.Base { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + return kvbase +} + +func TestComputeClosetTime(t *testing.T) { + kvbase := newKvBase() + var news segment.SegmentStats + for i := 0; i < 10; i++ { + news = segment.SegmentStats{ + SegementID: uint64(6875940398055133887), + MemorySize: uint64(i * 1000), + MemoryRate: 0.9, + } + ComputeCloseTime(news, kvbase) + } +} diff --git a/pkg/master/grpc/server.go b/pkg/master/grpc/server.go new file mode 100644 index 0000000000..591d651b1d --- /dev/null +++ b/pkg/master/grpc/server.go @@ -0,0 +1,70 @@ +package grpc + +import ( + "context" + "fmt" + "net" + "strconv" + + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/controller" + masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" + "github.com/czs007/suvlim/pkg/master/grpc/message" + messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/kv" + "google.golang.org/grpc" +) + +func Server(ch chan *messagepb.Mapping, errch chan error, kvbase kv.Base) { + defaultGRPCPort := ":" + defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) + lis, err := net.Listen("tcp", defaultGRPCPort) + if err != nil { + // log.Fatal("failed to listen: %v", err) + errch <- err + return + } + s := grpc.NewServer() + masterpb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch, kvbase: kvbase}) + if err := s.Serve(lis); err != nil { + // log.Fatalf("failed to serve: %v", err) + errch <- err + return + } +} + +type GRPCMasterServer struct { + CreateRequest chan *messagepb.Mapping + kvbase kv.Base +} + +func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { + // ms.CreateRequest <- in2 + fmt.Println("Handle a new create collection request") + err := controller.WriteCollection2Datastore(in, ms.kvbase) + if err != nil { + return &messagepb.Status{ + ErrorCode: 100, + Reason: "", + }, err + } + return &messagepb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) { + fmt.Println("Handle a new create index request") + err := controller.UpdateCollectionIndex(in, ms.kvbase) + if err != nil { + return &messagepb.Status{ + ErrorCode: 100, + Reason: "", + }, err + } + return &messagepb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index e6eaee7c1c..77a37113d8 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -1,15 +1,13 @@ package informer import ( - "context" - "fmt" - "github.com/czs007/suvlim/conf" "log" "strconv" "time" + "github.com/czs007/suvlim/conf" + "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/pkg/master/mock" ) func NewPulsarClient() PulsarClient { @@ -34,30 +32,3 @@ func NewPulsarClient() PulsarClient { type PulsarClient struct { Client pulsar.Client } - -func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { - consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ - Topic: conf.Config.Master.PulsarTopic, - SubscriptionName: "my-sub", - Type: pulsar.Shared, - }) - if err != nil { - log.Fatal(err) - } - for { - msg, err := consumer.Receive(context.TODO()) - if err != nil { - log.Fatal(err) - } - m, _ := mock.SegmentUnMarshal(msg.Payload()) - fmt.Printf("Received message msgId: %#v -- content: '%s'\n", - msg.ID(), m.SegementID) - ssChan <- m - consumer.Ack(msg) - } - - if err := consumer.Unsubscribe(); err != nil { - log.Fatal(err) - } - return nil -} diff --git a/pkg/master/kv/etcd_kv.go b/pkg/master/kv/etcd_kv.go index 081cc8b35f..8de8054a1b 100644 --- a/pkg/master/kv/etcd_kv.go +++ b/pkg/master/kv/etcd_kv.go @@ -34,23 +34,22 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase { } } -func (kv *EtcdKVBase) Close(){ +func (kv *EtcdKVBase) Close() { kv.client.Close() } - -func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) { +func (kv *EtcdKVBase) LoadWithPrefix(key string) ([]string, []string) { key = path.Join(kv.rootPath, key) println("in loadWithPrefix,", key) - resp, err := etcdutil.EtcdKVGet(kv.client, key,clientv3.WithPrefix()) + resp, err := etcdutil.EtcdKVGet(kv.client, key, clientv3.WithPrefix()) if err != nil { - return [] string {}, [] string {} + return []string{}, []string{} } var keys []string var values []string - for _,kvs := range resp.Kvs{ + for _, kvs := range resp.Kvs { //println(len(kvs.)) - if len(kvs.Key) <= 0{ + if len(kvs.Key) <= 0 { println("KKK") continue } diff --git a/pkg/master/mock/fake/maing.go b/pkg/master/mock/fake/maing.go new file mode 100644 index 0000000000..b25c6c0fc3 --- /dev/null +++ b/pkg/master/mock/fake/maing.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + "log" + "strconv" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/mock" +) + +func main() { + FakeProduecer() +} +func FakeProduecer() { + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: pulsarAddr, + OperationTimeout: 30 * time.Second, + ConnectionTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatalf("Could not instantiate Pulsar client: %v", err) + } + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: conf.Config.Master.PulsarTopic, + }) + testSegmentStats, _ := mock.SegmentMarshal(mock.SegmentStats{ + SegementID: uint64(6875939483227099806), + MemorySize: uint64(9999), + MemoryRate: float64(0.13), + }) + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: testSegmentStats, + }) + time.Sleep(1 * time.Second) + defer producer.Close() + + if err != nil { + fmt.Errorf("%v", err) + } + fmt.Println("Published message") + +} diff --git a/pkg/master/mock/pulsar.go b/pkg/master/mock/pulsar.go index d7f551aa3a..bbd3a54dfd 100644 --- a/pkg/master/mock/pulsar.go +++ b/pkg/master/mock/pulsar.go @@ -3,11 +3,12 @@ package mock import ( "context" "fmt" - "github.com/czs007/suvlim/conf" "log" "strconv" "time" + "github.com/czs007/suvlim/conf" + "github.com/apache/pulsar-client-go/pulsar" ) @@ -29,8 +30,8 @@ func FakePulsarProducer() { Topic: conf.Config.Master.PulsarTopic, }) testSegmentStats, _ := SegmentMarshal(SegmentStats{ - SegementID: uint64(1111), - MemorySize: uint64(333322), + SegementID: uint64(6875875531164062448), + MemorySize: uint64(9999), MemoryRate: float64(0.13), }) for { diff --git a/pkg/master/mock/segment_test.go b/pkg/master/mock/segment_test.go index c24196b6ae..6239bdbacb 100644 --- a/pkg/master/mock/segment_test.go +++ b/pkg/master/mock/segment_test.go @@ -1,15 +1,21 @@ package mock import ( + "context" "fmt" + "log" + "strconv" "testing" "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" ) func TestSegmentMarshal(t *testing.T) { s := SegmentStats{ - SegementID: uint64(12315), - MemorySize: uint64(233113), + SegementID: uint64(6875873667148255882), + MemorySize: uint64(9999), MemoryRate: float64(0.13), } @@ -47,3 +53,38 @@ func TestSegment2JSON(t *testing.T) { } fmt.Println(res) } + +func TestFakePulsarProducer(t *testing.T) { + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: pulsarAddr, + OperationTimeout: 30 * time.Second, + ConnectionTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatalf("Could not instantiate Pulsar client: %v", err) + } + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: conf.Config.Master.PulsarTopic, + }) + testSegmentStats, _ := SegmentMarshal(SegmentStats{ + SegementID: uint64(6875875531164062448), + MemorySize: uint64(9999), + MemoryRate: float64(0.13), + }) + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: testSegmentStats, + }) + time.Sleep(1 * time.Second) + defer producer.Close() + + if err != nil { + t.Error(err) + } + fmt.Println("Published message") + +} diff --git a/pkg/master/segment/segment.go b/pkg/master/segment/segment.go new file mode 100644 index 0000000000..ccc8e4d42d --- /dev/null +++ b/pkg/master/segment/segment.go @@ -0,0 +1,54 @@ +package segment + +import ( + "time" + + masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" + jsoniter "github.com/json-iterator/go" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +type Segment struct { + SegmentID uint64 `json:"segment_id"` + CollectionID uint64 `json:"collection_id"` + PartitionTag string `json:"partition_tag"` + ChannelStart int `json:"channel_start"` + ChannelEnd int `json:"channel_end"` + OpenTimeStamp uint64 `json:"open_timestamp"` + CloseTimeStamp uint64 `json:"close_timestamp"` + CollectionName string `json:"collection_name"` + Status masterpb.SegmentStatus `json:"segment_status"` + Rows int64 `json:"rows"` +} + +func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { + return Segment{ + SegmentID: id, + CollectionID: collectioID, + CollectionName: cName, + PartitionTag: ptag, + ChannelStart: chStart, + ChannelEnd: chEnd, + OpenTimeStamp: uint64(openTime.Unix()), + CloseTimeStamp: uint64(closeTime.Unix()), + } +} + +func Segment2JSON(s Segment) (string, error) { + b, err := json.Marshal(&s) + if err != nil { + return "", err + } + + return string(b), nil +} + +func JSON2Segment(s string) (*Segment, error) { + var c Segment + err := json.Unmarshal([]byte(s), &c) + if err != nil { + return &Segment{}, err + } + return &c, nil +} diff --git a/pkg/master/segment/stats.go b/pkg/master/segment/stats.go new file mode 100644 index 0000000000..3ec476c2ed --- /dev/null +++ b/pkg/master/segment/stats.go @@ -0,0 +1,69 @@ +package segment + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "log" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" + masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" + "github.com/czs007/suvlim/pkg/master/informer" +) + +type SegmentStats struct { + SegementID uint64 + MemorySize uint64 + MemoryRate float64 + Status masterpb.SegmentStatus + Rows int64 +} + +func SegmentMarshal(s SegmentStats) ([]byte, error) { + var nb bytes.Buffer + enc := gob.NewEncoder(&nb) + err := enc.Encode(s) + if err != nil { + return []byte{}, err + } + return nb.Bytes(), nil +} + +func SegmentUnMarshal(data []byte) (SegmentStats, error) { + var ss SegmentStats + dec := gob.NewDecoder(bytes.NewBuffer(data)) + err := dec.Decode(&ss) + if err != nil { + return SegmentStats{}, err + } + return ss, nil +} + +func Listener(ssChan chan SegmentStats, pc informer.PulsarClient) error { + consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ + Topic: conf.Config.Master.PulsarTopic, + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + log.Fatal(err) + } + for { + msg, err := consumer.Receive(context.TODO()) + if err != nil { + log.Fatal(err) + } + m, _ := SegmentUnMarshal(msg.Payload()) + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), m.SegementID) + ssChan <- m + consumer.Ack(msg) + } + + if err := consumer.Unsubscribe(); err != nil { + log.Fatal(err) + } + return nil +} diff --git a/pkg/master/server.go b/pkg/master/server.go index 0910df24cd..7c348a179b 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -1,37 +1,38 @@ package master import ( - "context" - "fmt" "log" - "net" "strconv" "time" "github.com/czs007/suvlim/conf" - pb "github.com/czs007/suvlim/pkg/master/grpc/master" - "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/controller" + milvusgrpc "github.com/czs007/suvlim/pkg/master/grpc" messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/pkg/master/id" - "github.com/czs007/suvlim/pkg/master/informer" "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/pkg/master/mock" + "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" ) func Run() { - go mock.FakePulsarProducer() - go SegmentStatsController() + kvbase := newKvBase() collectionChan := make(chan *messagepb.Mapping) defer close(collectionChan) - go GRPCServer(collectionChan) - go CollectionController(collectionChan) + + errorch := make(chan error) + defer close(errorch) + + go milvusgrpc.Server(collectionChan, errorch, kvbase) + go controller.SegmentStatsController(kvbase, errorch) + go controller.CollectionController(collectionChan, kvbase, errorch) for { + for v := range errorch { + log.Fatal(v) + } } } -func SegmentStatsController() { +func newKvBase() kv.Base { etcdAddr := conf.Config.Etcd.Address etcdAddr += ":" etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) @@ -39,291 +40,7 @@ func SegmentStatsController() { Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) - defer cli.Close() + // defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - - ssChan := make(chan mock.SegmentStats, 10) - defer close(ssChan) - ssClient := informer.NewPulsarClient() - go ssClient.Listener(ssChan) - for { - select { - case ss := <-ssChan: - ComputeCloseTime(ss, kvbase) - UpdateSegmentStatus(ss, kvbase) - case <-time.After(5 * time.Second): - fmt.Println("timeout") - return - } - } -} - -func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { - if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { - currentTime := time.Now() - memRate := int(ss.MemoryRate) - if memRate == 0 { - memRate = 1 - } - sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate - data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) - if err != nil { - return err - } - seg, err := mock.JSON2Segment(data) - if err != nil { - return err - } - seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) - fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp) - updateData, err := mock.Segment2JSON(*seg) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) - //create new segment - newSegID := id.New().Uint64() - newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) - newSegData, err := mock.Segment2JSON(*&newSeg) - if err != nil { - return err - } - //save to kv store - kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) - // update collection data - c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) - collection, err := mock.JSON2Collection(c) - if err != nil { - return err - } - segIDs := collection.SegmentIDs - segIDs = append(segIDs, newSegID) - collection.SegmentIDs = segIDs - cData, err := mock.Collection2JSON(*collection) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) - } - return nil -} - -func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error { - segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) - if err != nil { - return err - } - seg, err := mock.JSON2Segment(segmentData) - if err != nil { - return err - } - var changed bool - changed = false - if seg.Status != ss.Status { - changed = true - seg.Status = ss.Status - } - if seg.Rows != ss.Rows { - changed = true - seg.Rows = ss.Rows - } - - if changed { - segData, err := mock.Segment2JSON(*seg) - if err != nil { - return err - } - err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData) - if err != nil { - return err - } - } - return nil -} - -func GRPCServer(ch chan *messagepb.Mapping) error { - defaultGRPCPort := ":" - defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) - lis, err := net.Listen("tcp", defaultGRPCPort) - if err != nil { - return err - } - s := grpc.NewServer() - pb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch}) - if err := s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) - return err - } - return nil -} - -type GRPCMasterServer struct { - CreateRequest chan *messagepb.Mapping -} - -func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { - // ms.CreateRequest <- in2 - fmt.Println("Handle a new create collection request") - err := WriteCollection2Datastore(in) - if err != nil { - return &messagepb.Status{ - ErrorCode: 100, - Reason: "", - }, err - } - return &messagepb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) { - fmt.Println("Handle a new create index request") - err := UpdateCollectionIndex(in) - if err != nil { - return &messagepb.Status{ - ErrorCode: 100, - Reason: "", - }, err - } - return &messagepb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -// func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *pb.CreateCollectionRequest) (*pb.CreateCollectionResponse, error) { -// return &pb.CreateCollectionResponse{ -// CollectionName: in.CollectionName, -// }, nil -// } - -func CollectionController(ch chan *messagepb.Mapping) { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - for collection := range ch { - sID := id.New().Uint64() - cID := id.New().Uint64() - s2ID := id.New().Uint64() - fieldMetas := []*messagepb.FieldMeta{} - if collection.Schema != nil { - fieldMetas = collection.Schema.FieldMetas - } - c := mock.NewCollection(cID, collection.CollectionName, - time.Now(), fieldMetas, []uint64{sID, s2ID}, - []string{"default"}) - cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) - s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) - collectionData, _ := mock.Collection2JSON(*cm) - segmentData, err := mock.Segment2JSON(s) - if err != nil { - log.Fatal(err) - } - s2Data, err := mock.Segment2JSON(s2) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data) - if err != nil { - log.Fatal(err) - } - } -} - -func WriteCollection2Datastore(collection *messagepb.Mapping) error { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - sID := id.New().Uint64() - cID := id.New().Uint64() - fieldMetas := []*messagepb.FieldMeta{} - if collection.Schema != nil { - fieldMetas = collection.Schema.FieldMetas - } - c := mock.NewCollection(cID, collection.CollectionName, - time.Now(), fieldMetas, []uint64{sID}, - []string{"default"}) - cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) - collectionData, err := mock.Collection2JSON(*cm) - if err != nil { - log.Fatal(err) - return err - } - segmentData, err := mock.Segment2JSON(s) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - return err - } - return nil - -} - -func UpdateCollectionIndex(index *messagepb.IndexParam) error { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - collectionName := index.CollectionName - c, err := kvbase.Load("collection/" + collectionName) - if err != nil { - return err - } - collection, err := mock.JSON2Collection(c) - if err != nil { - return err - } - for k, v := range collection.IndexParam { - if v.IndexName == index.IndexName { - collection.IndexParam[k] = v - } - } - collection.IndexParam = append(collection.IndexParam, index) - cm := mock.GrpcMarshal(collection) - collectionData, err := mock.Collection2JSON(*cm) - if err != nil { - return err - } - err = kvbase.Save("collection/"+collectionName, collectionData) - if err != nil { - return err - } - return nil + return kvbase }