From 3bd6c5417ccb929aa1bff8f481caefe15f31ade4 Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Fri, 25 Sep 2020 11:46:30 +0800 Subject: [PATCH] Fix core dump Signed-off-by: shengjh <1572099106@qq.com> --- pkg/master/collection/collection.go | 100 ------- pkg/master/collection/collection_test.go | 39 --- pkg/master/common/config.go | 10 + pkg/master/controller/collection.go | 117 --------- pkg/master/controller/segment.go | 114 -------- pkg/master/controller/segment_test.go | 37 --- pkg/master/grpc/server.go | 70 ----- pkg/master/informer/pulsar.go | 33 ++- pkg/master/kv/etcd_kv.go | 13 +- pkg/master/mock/fake/maing.go | 51 ---- pkg/master/mock/pulsar.go | 7 +- pkg/master/mock/segment_test.go | 45 +--- pkg/master/segment/segment.go | 54 ---- pkg/master/segment/stats.go | 69 ----- pkg/master/server.go | 317 +++++++++++++++++++++-- proxy/src/meta/etcd_watcher/Watcher.cpp | 3 + proxy/src/server/MetaWrapper.cpp | 54 ++-- proxy/src/server/Server.cpp | 12 +- 18 files changed, 396 insertions(+), 749 deletions(-) delete mode 100644 pkg/master/collection/collection.go delete mode 100644 pkg/master/collection/collection_test.go create mode 100644 pkg/master/common/config.go delete mode 100644 pkg/master/controller/collection.go delete mode 100644 pkg/master/controller/segment.go delete mode 100644 pkg/master/controller/segment_test.go delete mode 100644 pkg/master/grpc/server.go delete mode 100644 pkg/master/mock/fake/maing.go delete mode 100644 pkg/master/segment/segment.go delete mode 100644 pkg/master/segment/stats.go diff --git a/pkg/master/collection/collection.go b/pkg/master/collection/collection.go deleted file mode 100644 index 11e880c49f..0000000000 --- a/pkg/master/collection/collection.go +++ /dev/null @@ -1,100 +0,0 @@ -package collection - -import ( - "time" - - masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" - messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/gogo/protobuf/proto" - jsoniter "github.com/json-iterator/go" -) - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -type Collection struct { - ID uint64 `json:"id"` - Name string `json:"name"` - CreateTime uint64 `json:"creat_time"` - Schema []FieldMeta `json:"schema"` - // ExtraSchema []FieldMeta `json:"extra_schema"` - SegmentIDs []uint64 `json:"segment_ids"` - PartitionTags []string `json:"partition_tags"` - GrpcMarshalString string `json:"grpc_marshal_string"` - IndexParam []*messagepb.IndexParam `json:"index_param"` -} - -type FieldMeta struct { - FieldName string `json:"field_name"` - Type messagepb.DataType `json:"type"` - DIM int64 `json:"dimension"` -} - -func GrpcMarshal(c *Collection) *Collection { - if c.GrpcMarshalString != "" { - c.GrpcMarshalString = "" - } - pbSchema := &messagepb.Schema{ - FieldMetas: []*messagepb.FieldMeta{}, - } - schemaSlice := []*messagepb.FieldMeta{} - for _, v := range c.Schema { - newpbMeta := &messagepb.FieldMeta{ - FieldName: v.FieldName, - Type: v.Type, - Dim: v.DIM, - } - schemaSlice = append(schemaSlice, newpbMeta) - } - pbSchema.FieldMetas = schemaSlice - grpcCollection := &masterpb.Collection{ - Id: c.ID, - Name: c.Name, - Schema: pbSchema, - CreateTime: c.CreateTime, - SegmentIds: c.SegmentIDs, - PartitionTags: c.PartitionTags, - Indexes: c.IndexParam, - } - out := proto.MarshalTextString(grpcCollection) - c.GrpcMarshalString = out - return c -} - -func NewCollection(id uint64, name string, createTime time.Time, - schema []*messagepb.FieldMeta, sIds []uint64, ptags []string) Collection { - - segementIDs := []uint64{} - newSchema := []FieldMeta{} - for _, v := range schema { - newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type, DIM: v.Dim}) - } - for _, sid := range sIds { - segementIDs = append(segementIDs, sid) - } - return Collection{ - ID: id, - Name: name, - CreateTime: uint64(createTime.Unix()), - Schema: newSchema, - SegmentIDs: segementIDs, - PartitionTags: ptags, - } -} - -func Collection2JSON(c Collection) (string, error) { - b, err := json.Marshal(&c) - if err != nil { - return "", err - } - - return string(b), nil -} - -func JSON2Collection(s string) (*Collection, error) { - var c Collection - err := json.Unmarshal([]byte(s), &c) - if err != nil { - return &Collection{}, err - } - return &c, nil -} diff --git a/pkg/master/collection/collection_test.go b/pkg/master/collection/collection_test.go deleted file mode 100644 index 256ead7fa0..0000000000 --- a/pkg/master/collection/collection_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package collection - -import ( - "testing" - "time" - - messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/stretchr/testify/assert" -) - -var ( - cid = uint64(10011111234) - name = "test-segment" - createTime = time.Now() - schema = []*messagepb.FieldMeta{} - sIds = []uint64{111111, 222222} - ptags = []string{"default", "test"} -) - -func TestNewCollection(t *testing.T) { - assert := assert.New(t) - c := NewCollection(cid, name, createTime, schema, sIds, ptags) - assert.Equal(cid, c.ID) - assert.Equal(name, c.Name) - for k, v := range schema { - assert.Equal(v.Dim, c.Schema[k].DIM) - assert.Equal(v.FieldName, c.Schema[k].FieldName) - assert.Equal(v.Type, c.Schema[k].Type) - } - assert.Equal(sIds, c.SegmentIDs) - assert.Equal(ptags, c.PartitionTags) -} - -func TestGrpcMarshal(t *testing.T) { - assert := assert.New(t) - c := NewCollection(cid, name, createTime, schema, sIds, ptags) - newc := GrpcMarshal(&c) - assert.NotEqual("", newc.GrpcMarshalString) -} diff --git a/pkg/master/common/config.go b/pkg/master/common/config.go new file mode 100644 index 0000000000..844b5b6e8d --- /dev/null +++ b/pkg/master/common/config.go @@ -0,0 +1,10 @@ +package common + +//const ( +// PULSAR_URL = "pulsar://localhost:6650" +// PULSAR_MONITER_INTERVAL = 1 * time.Second +// PULSAR_TOPIC = "monitor-topic" +// ETCD_ROOT_PATH = "by-dev" +// SEGMENT_THRESHOLE = 10000 +// DEFAULT_GRPC_PORT = ":53100" +//) diff --git a/pkg/master/controller/collection.go b/pkg/master/controller/collection.go deleted file mode 100644 index 6ff5d2345a..0000000000 --- a/pkg/master/controller/collection.go +++ /dev/null @@ -1,117 +0,0 @@ -package controller - -import ( - "log" - "strconv" - "time" - - "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/collection" - messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/pkg/master/id" - "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/pkg/master/segment" -) - -func CollectionController(ch chan *messagepb.Mapping, kvbase kv.Base, errch chan error) { - for collectionMeta := range ch { - sID := id.New().Uint64() - cID := id.New().Uint64() - s2ID := id.New().Uint64() - fieldMetas := []*messagepb.FieldMeta{} - if collectionMeta.Schema != nil { - fieldMetas = collectionMeta.Schema.FieldMetas - } - c := collection.NewCollection(cID, collectionMeta.CollectionName, - time.Now(), fieldMetas, []uint64{sID, s2ID}, - []string{"default"}) - cm := collection.GrpcMarshal(&c) - s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) - s2 := segment.NewSegment(s2ID, cID, collectionMeta.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) - collectionData, _ := collection.Collection2JSON(*cm) - segmentData, err := segment.Segment2JSON(s) - if err != nil { - log.Fatal(err) - } - s2Data, err := segment.Segment2JSON(s2) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data) - if err != nil { - log.Fatal(err) - } - } -} - -func WriteCollection2Datastore(collectionMeta *messagepb.Mapping, kvbase kv.Base) error { - sID := id.New().Uint64() - cID := id.New().Uint64() - fieldMetas := []*messagepb.FieldMeta{} - if collectionMeta.Schema != nil { - fieldMetas = collectionMeta.Schema.FieldMetas - } - c := collection.NewCollection(cID, collectionMeta.CollectionName, - time.Now(), fieldMetas, []uint64{sID}, - []string{"default"}) - cm := collection.GrpcMarshal(&c) - s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) - collectionData, err := collection.Collection2JSON(*cm) - if err != nil { - log.Fatal(err) - return err - } - segmentData, err := segment.Segment2JSON(s) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - return err - } - return nil - -} - -func UpdateCollectionIndex(index *messagepb.IndexParam, kvbase kv.Base) error { - collectionName := index.CollectionName - collectionJSON, err := kvbase.Load("collection/" + collectionName) - if err != nil { - return err - } - c, err := collection.JSON2Collection(collectionJSON) - if err != nil { - return err - } - for k, v := range c.IndexParam { - if v.IndexName == index.IndexName { - c.IndexParam[k] = v - } - } - c.IndexParam = append(c.IndexParam, index) - cm := collection.GrpcMarshal(c) - collectionData, err := collection.Collection2JSON(*cm) - if err != nil { - return err - } - err = kvbase.Save("collection/"+collectionName, collectionData) - if err != nil { - return err - } - return nil -} diff --git a/pkg/master/controller/segment.go b/pkg/master/controller/segment.go deleted file mode 100644 index 5ddf12f45f..0000000000 --- a/pkg/master/controller/segment.go +++ /dev/null @@ -1,114 +0,0 @@ -package controller - -import ( - "fmt" - "strconv" - "time" - - "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/collection" - "github.com/czs007/suvlim/pkg/master/id" - "github.com/czs007/suvlim/pkg/master/informer" - "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/pkg/master/segment" -) - -func SegmentStatsController(kvbase kv.Base, errch chan error) { - ssChan := make(chan segment.SegmentStats, 10) - ssClient := informer.NewPulsarClient() - go segment.Listener(ssChan, ssClient) - for { - select { - case ss := <-ssChan: - errch <- ComputeCloseTime(ss, kvbase) - errch <- UpdateSegmentStatus(ss, kvbase) - case <-time.After(5 * time.Second): - fmt.Println("wait for new request") - return - } - } - -} - -func ComputeCloseTime(ss segment.SegmentStats, kvbase kv.Base) error { - if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { - currentTime := time.Now() - memRate := int(ss.MemoryRate) - if memRate == 0 { - memRate = 1 - } - sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate - data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) - if err != nil { - return err - } - seg, err := segment.JSON2Segment(data) - if err != nil { - return err - } - seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) - fmt.Println(seg) - updateData, err := segment.Segment2JSON(*seg) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) - //create new segment - newSegID := id.New().Uint64() - newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) - newSegData, err := segment.Segment2JSON(*&newSeg) - if err != nil { - return err - } - //save to kv store - kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) - // update collection data - c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) - collectionMeta, err := collection.JSON2Collection(c) - if err != nil { - return err - } - segIDs := collectionMeta.SegmentIDs - segIDs = append(segIDs, newSegID) - collectionMeta.SegmentIDs = segIDs - cData, err := collection.Collection2JSON(*collectionMeta) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) - } - return nil -} - -func UpdateSegmentStatus(ss segment.SegmentStats, kvbase kv.Base) error { - segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) - if err != nil { - return err - } - seg, err := segment.JSON2Segment(segmentData) - if err != nil { - return err - } - var changed bool - changed = false - if seg.Status != ss.Status { - changed = true - seg.Status = ss.Status - } - if seg.Rows != ss.Rows { - changed = true - seg.Rows = ss.Rows - } - - if changed { - segData, err := segment.Segment2JSON(*seg) - if err != nil { - return err - } - err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData) - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/master/controller/segment_test.go b/pkg/master/controller/segment_test.go deleted file mode 100644 index 76aef7c421..0000000000 --- a/pkg/master/controller/segment_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package controller - -import ( - "strconv" - "testing" - "time" - - "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/kv" - "github.com/czs007/suvlim/pkg/master/segment" - "go.etcd.io/etcd/clientv3" -) - -func newKvBase() kv.Base { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - return kvbase -} - -func TestComputeClosetTime(t *testing.T) { - kvbase := newKvBase() - var news segment.SegmentStats - for i := 0; i < 10; i++ { - news = segment.SegmentStats{ - SegementID: uint64(6875940398055133887), - MemorySize: uint64(i * 1000), - MemoryRate: 0.9, - } - ComputeCloseTime(news, kvbase) - } -} diff --git a/pkg/master/grpc/server.go b/pkg/master/grpc/server.go deleted file mode 100644 index 591d651b1d..0000000000 --- a/pkg/master/grpc/server.go +++ /dev/null @@ -1,70 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "net" - "strconv" - - "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/controller" - masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" - "github.com/czs007/suvlim/pkg/master/grpc/message" - messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/pkg/master/kv" - "google.golang.org/grpc" -) - -func Server(ch chan *messagepb.Mapping, errch chan error, kvbase kv.Base) { - defaultGRPCPort := ":" - defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) - lis, err := net.Listen("tcp", defaultGRPCPort) - if err != nil { - // log.Fatal("failed to listen: %v", err) - errch <- err - return - } - s := grpc.NewServer() - masterpb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch, kvbase: kvbase}) - if err := s.Serve(lis); err != nil { - // log.Fatalf("failed to serve: %v", err) - errch <- err - return - } -} - -type GRPCMasterServer struct { - CreateRequest chan *messagepb.Mapping - kvbase kv.Base -} - -func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { - // ms.CreateRequest <- in2 - fmt.Println("Handle a new create collection request") - err := controller.WriteCollection2Datastore(in, ms.kvbase) - if err != nil { - return &messagepb.Status{ - ErrorCode: 100, - Reason: "", - }, err - } - return &messagepb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) { - fmt.Println("Handle a new create index request") - err := controller.UpdateCollectionIndex(in, ms.kvbase) - if err != nil { - return &messagepb.Status{ - ErrorCode: 100, - Reason: "", - }, err - } - return &messagepb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index 77a37113d8..e6eaee7c1c 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -1,13 +1,15 @@ package informer import ( + "context" + "fmt" + "github.com/czs007/suvlim/conf" "log" "strconv" "time" - "github.com/czs007/suvlim/conf" - "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/pkg/master/mock" ) func NewPulsarClient() PulsarClient { @@ -32,3 +34,30 @@ func NewPulsarClient() PulsarClient { type PulsarClient struct { Client pulsar.Client } + +func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { + consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ + Topic: conf.Config.Master.PulsarTopic, + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + log.Fatal(err) + } + for { + msg, err := consumer.Receive(context.TODO()) + if err != nil { + log.Fatal(err) + } + m, _ := mock.SegmentUnMarshal(msg.Payload()) + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), m.SegementID) + ssChan <- m + consumer.Ack(msg) + } + + if err := consumer.Unsubscribe(); err != nil { + log.Fatal(err) + } + return nil +} diff --git a/pkg/master/kv/etcd_kv.go b/pkg/master/kv/etcd_kv.go index 8de8054a1b..081cc8b35f 100644 --- a/pkg/master/kv/etcd_kv.go +++ b/pkg/master/kv/etcd_kv.go @@ -34,22 +34,23 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase { } } -func (kv *EtcdKVBase) Close() { +func (kv *EtcdKVBase) Close(){ kv.client.Close() } -func (kv *EtcdKVBase) LoadWithPrefix(key string) ([]string, []string) { + +func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) { key = path.Join(kv.rootPath, key) println("in loadWithPrefix,", key) - resp, err := etcdutil.EtcdKVGet(kv.client, key, clientv3.WithPrefix()) + resp, err := etcdutil.EtcdKVGet(kv.client, key,clientv3.WithPrefix()) if err != nil { - return []string{}, []string{} + return [] string {}, [] string {} } var keys []string var values []string - for _, kvs := range resp.Kvs { + for _,kvs := range resp.Kvs{ //println(len(kvs.)) - if len(kvs.Key) <= 0 { + if len(kvs.Key) <= 0{ println("KKK") continue } diff --git a/pkg/master/mock/fake/maing.go b/pkg/master/mock/fake/maing.go deleted file mode 100644 index b25c6c0fc3..0000000000 --- a/pkg/master/mock/fake/maing.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "strconv" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/mock" -) - -func main() { - FakeProduecer() -} -func FakeProduecer() { - pulsarAddr := "pulsar://" - pulsarAddr += conf.Config.Pulsar.Address - pulsarAddr += ":" - pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarAddr, - OperationTimeout: 30 * time.Second, - ConnectionTimeout: 30 * time.Second, - }) - if err != nil { - log.Fatalf("Could not instantiate Pulsar client: %v", err) - } - - producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: conf.Config.Master.PulsarTopic, - }) - testSegmentStats, _ := mock.SegmentMarshal(mock.SegmentStats{ - SegementID: uint64(6875939483227099806), - MemorySize: uint64(9999), - MemoryRate: float64(0.13), - }) - _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ - Payload: testSegmentStats, - }) - time.Sleep(1 * time.Second) - defer producer.Close() - - if err != nil { - fmt.Errorf("%v", err) - } - fmt.Println("Published message") - -} diff --git a/pkg/master/mock/pulsar.go b/pkg/master/mock/pulsar.go index bbd3a54dfd..d7f551aa3a 100644 --- a/pkg/master/mock/pulsar.go +++ b/pkg/master/mock/pulsar.go @@ -3,12 +3,11 @@ package mock import ( "context" "fmt" + "github.com/czs007/suvlim/conf" "log" "strconv" "time" - "github.com/czs007/suvlim/conf" - "github.com/apache/pulsar-client-go/pulsar" ) @@ -30,8 +29,8 @@ func FakePulsarProducer() { Topic: conf.Config.Master.PulsarTopic, }) testSegmentStats, _ := SegmentMarshal(SegmentStats{ - SegementID: uint64(6875875531164062448), - MemorySize: uint64(9999), + SegementID: uint64(1111), + MemorySize: uint64(333322), MemoryRate: float64(0.13), }) for { diff --git a/pkg/master/mock/segment_test.go b/pkg/master/mock/segment_test.go index 6239bdbacb..c24196b6ae 100644 --- a/pkg/master/mock/segment_test.go +++ b/pkg/master/mock/segment_test.go @@ -1,21 +1,15 @@ package mock import ( - "context" "fmt" - "log" - "strconv" "testing" "time" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/conf" ) func TestSegmentMarshal(t *testing.T) { s := SegmentStats{ - SegementID: uint64(6875873667148255882), - MemorySize: uint64(9999), + SegementID: uint64(12315), + MemorySize: uint64(233113), MemoryRate: float64(0.13), } @@ -53,38 +47,3 @@ func TestSegment2JSON(t *testing.T) { } fmt.Println(res) } - -func TestFakePulsarProducer(t *testing.T) { - pulsarAddr := "pulsar://" - pulsarAddr += conf.Config.Pulsar.Address - pulsarAddr += ":" - pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarAddr, - OperationTimeout: 30 * time.Second, - ConnectionTimeout: 30 * time.Second, - }) - if err != nil { - log.Fatalf("Could not instantiate Pulsar client: %v", err) - } - - producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: conf.Config.Master.PulsarTopic, - }) - testSegmentStats, _ := SegmentMarshal(SegmentStats{ - SegementID: uint64(6875875531164062448), - MemorySize: uint64(9999), - MemoryRate: float64(0.13), - }) - _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ - Payload: testSegmentStats, - }) - time.Sleep(1 * time.Second) - defer producer.Close() - - if err != nil { - t.Error(err) - } - fmt.Println("Published message") - -} diff --git a/pkg/master/segment/segment.go b/pkg/master/segment/segment.go deleted file mode 100644 index ccc8e4d42d..0000000000 --- a/pkg/master/segment/segment.go +++ /dev/null @@ -1,54 +0,0 @@ -package segment - -import ( - "time" - - masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" - jsoniter "github.com/json-iterator/go" -) - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -type Segment struct { - SegmentID uint64 `json:"segment_id"` - CollectionID uint64 `json:"collection_id"` - PartitionTag string `json:"partition_tag"` - ChannelStart int `json:"channel_start"` - ChannelEnd int `json:"channel_end"` - OpenTimeStamp uint64 `json:"open_timestamp"` - CloseTimeStamp uint64 `json:"close_timestamp"` - CollectionName string `json:"collection_name"` - Status masterpb.SegmentStatus `json:"segment_status"` - Rows int64 `json:"rows"` -} - -func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { - return Segment{ - SegmentID: id, - CollectionID: collectioID, - CollectionName: cName, - PartitionTag: ptag, - ChannelStart: chStart, - ChannelEnd: chEnd, - OpenTimeStamp: uint64(openTime.Unix()), - CloseTimeStamp: uint64(closeTime.Unix()), - } -} - -func Segment2JSON(s Segment) (string, error) { - b, err := json.Marshal(&s) - if err != nil { - return "", err - } - - return string(b), nil -} - -func JSON2Segment(s string) (*Segment, error) { - var c Segment - err := json.Unmarshal([]byte(s), &c) - if err != nil { - return &Segment{}, err - } - return &c, nil -} diff --git a/pkg/master/segment/stats.go b/pkg/master/segment/stats.go deleted file mode 100644 index 3ec476c2ed..0000000000 --- a/pkg/master/segment/stats.go +++ /dev/null @@ -1,69 +0,0 @@ -package segment - -import ( - "bytes" - "context" - "encoding/gob" - "fmt" - "log" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/conf" - masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" - "github.com/czs007/suvlim/pkg/master/informer" -) - -type SegmentStats struct { - SegementID uint64 - MemorySize uint64 - MemoryRate float64 - Status masterpb.SegmentStatus - Rows int64 -} - -func SegmentMarshal(s SegmentStats) ([]byte, error) { - var nb bytes.Buffer - enc := gob.NewEncoder(&nb) - err := enc.Encode(s) - if err != nil { - return []byte{}, err - } - return nb.Bytes(), nil -} - -func SegmentUnMarshal(data []byte) (SegmentStats, error) { - var ss SegmentStats - dec := gob.NewDecoder(bytes.NewBuffer(data)) - err := dec.Decode(&ss) - if err != nil { - return SegmentStats{}, err - } - return ss, nil -} - -func Listener(ssChan chan SegmentStats, pc informer.PulsarClient) error { - consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ - Topic: conf.Config.Master.PulsarTopic, - SubscriptionName: "my-sub", - Type: pulsar.Shared, - }) - if err != nil { - log.Fatal(err) - } - for { - msg, err := consumer.Receive(context.TODO()) - if err != nil { - log.Fatal(err) - } - m, _ := SegmentUnMarshal(msg.Payload()) - fmt.Printf("Received message msgId: %#v -- content: '%s'\n", - msg.ID(), m.SegementID) - ssChan <- m - consumer.Ack(msg) - } - - if err := consumer.Unsubscribe(); err != nil { - log.Fatal(err) - } - return nil -} diff --git a/pkg/master/server.go b/pkg/master/server.go index 7c348a179b..0910df24cd 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -1,38 +1,37 @@ package master import ( + "context" + "fmt" "log" + "net" "strconv" "time" "github.com/czs007/suvlim/conf" - "github.com/czs007/suvlim/pkg/master/controller" - milvusgrpc "github.com/czs007/suvlim/pkg/master/grpc" + pb "github.com/czs007/suvlim/pkg/master/grpc/master" + "github.com/czs007/suvlim/pkg/master/grpc/message" messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/id" + "github.com/czs007/suvlim/pkg/master/informer" "github.com/czs007/suvlim/pkg/master/kv" - + "github.com/czs007/suvlim/pkg/master/mock" "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) func Run() { - kvbase := newKvBase() + go mock.FakePulsarProducer() + go SegmentStatsController() collectionChan := make(chan *messagepb.Mapping) defer close(collectionChan) - - errorch := make(chan error) - defer close(errorch) - - go milvusgrpc.Server(collectionChan, errorch, kvbase) - go controller.SegmentStatsController(kvbase, errorch) - go controller.CollectionController(collectionChan, kvbase, errorch) + go GRPCServer(collectionChan) + go CollectionController(collectionChan) for { - for v := range errorch { - log.Fatal(v) - } } } -func newKvBase() kv.Base { +func SegmentStatsController() { etcdAddr := conf.Config.Etcd.Address etcdAddr += ":" etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) @@ -40,7 +39,291 @@ func newKvBase() kv.Base { Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) - // defer cli.Close() + defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - return kvbase + + ssChan := make(chan mock.SegmentStats, 10) + defer close(ssChan) + ssClient := informer.NewPulsarClient() + go ssClient.Listener(ssChan) + for { + select { + case ss := <-ssChan: + ComputeCloseTime(ss, kvbase) + UpdateSegmentStatus(ss, kvbase) + case <-time.After(5 * time.Second): + fmt.Println("timeout") + return + } + } +} + +func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { + if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { + currentTime := time.Now() + memRate := int(ss.MemoryRate) + if memRate == 0 { + memRate = 1 + } + sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate + data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) + if err != nil { + return err + } + seg, err := mock.JSON2Segment(data) + if err != nil { + return err + } + seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) + fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp) + updateData, err := mock.Segment2JSON(*seg) + if err != nil { + return err + } + kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) + //create new segment + newSegID := id.New().Uint64() + newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) + newSegData, err := mock.Segment2JSON(*&newSeg) + if err != nil { + return err + } + //save to kv store + kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) + // update collection data + c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) + collection, err := mock.JSON2Collection(c) + if err != nil { + return err + } + segIDs := collection.SegmentIDs + segIDs = append(segIDs, newSegID) + collection.SegmentIDs = segIDs + cData, err := mock.Collection2JSON(*collection) + if err != nil { + return err + } + kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) + } + return nil +} + +func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error { + segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) + if err != nil { + return err + } + seg, err := mock.JSON2Segment(segmentData) + if err != nil { + return err + } + var changed bool + changed = false + if seg.Status != ss.Status { + changed = true + seg.Status = ss.Status + } + if seg.Rows != ss.Rows { + changed = true + seg.Rows = ss.Rows + } + + if changed { + segData, err := mock.Segment2JSON(*seg) + if err != nil { + return err + } + err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData) + if err != nil { + return err + } + } + return nil +} + +func GRPCServer(ch chan *messagepb.Mapping) error { + defaultGRPCPort := ":" + defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) + lis, err := net.Listen("tcp", defaultGRPCPort) + if err != nil { + return err + } + s := grpc.NewServer() + pb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch}) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + return err + } + return nil +} + +type GRPCMasterServer struct { + CreateRequest chan *messagepb.Mapping +} + +func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { + // ms.CreateRequest <- in2 + fmt.Println("Handle a new create collection request") + err := WriteCollection2Datastore(in) + if err != nil { + return &messagepb.Status{ + ErrorCode: 100, + Reason: "", + }, err + } + return &messagepb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) { + fmt.Println("Handle a new create index request") + err := UpdateCollectionIndex(in) + if err != nil { + return &messagepb.Status{ + ErrorCode: 100, + Reason: "", + }, err + } + return &messagepb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +// func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *pb.CreateCollectionRequest) (*pb.CreateCollectionResponse, error) { +// return &pb.CreateCollectionResponse{ +// CollectionName: in.CollectionName, +// }, nil +// } + +func CollectionController(ch chan *messagepb.Mapping) { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + defer cli.Close() + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + for collection := range ch { + sID := id.New().Uint64() + cID := id.New().Uint64() + s2ID := id.New().Uint64() + fieldMetas := []*messagepb.FieldMeta{} + if collection.Schema != nil { + fieldMetas = collection.Schema.FieldMetas + } + c := mock.NewCollection(cID, collection.CollectionName, + time.Now(), fieldMetas, []uint64{sID, s2ID}, + []string{"default"}) + cm := mock.GrpcMarshal(&c) + s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) + s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) + collectionData, _ := mock.Collection2JSON(*cm) + segmentData, err := mock.Segment2JSON(s) + if err != nil { + log.Fatal(err) + } + s2Data, err := mock.Segment2JSON(s2) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data) + if err != nil { + log.Fatal(err) + } + } +} + +func WriteCollection2Datastore(collection *messagepb.Mapping) error { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + defer cli.Close() + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + sID := id.New().Uint64() + cID := id.New().Uint64() + fieldMetas := []*messagepb.FieldMeta{} + if collection.Schema != nil { + fieldMetas = collection.Schema.FieldMetas + } + c := mock.NewCollection(cID, collection.CollectionName, + time.Now(), fieldMetas, []uint64{sID}, + []string{"default"}) + cm := mock.GrpcMarshal(&c) + s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) + collectionData, err := mock.Collection2JSON(*cm) + if err != nil { + log.Fatal(err) + return err + } + segmentData, err := mock.Segment2JSON(s) + if err != nil { + log.Fatal(err) + return err + } + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) + if err != nil { + log.Fatal(err) + return err + } + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) + if err != nil { + log.Fatal(err) + return err + } + return nil + +} + +func UpdateCollectionIndex(index *messagepb.IndexParam) error { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + defer cli.Close() + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + collectionName := index.CollectionName + c, err := kvbase.Load("collection/" + collectionName) + if err != nil { + return err + } + collection, err := mock.JSON2Collection(c) + if err != nil { + return err + } + for k, v := range collection.IndexParam { + if v.IndexName == index.IndexName { + collection.IndexParam[k] = v + } + } + collection.IndexParam = append(collection.IndexParam, index) + cm := mock.GrpcMarshal(collection) + collectionData, err := mock.Collection2JSON(*cm) + if err != nil { + return err + } + err = kvbase.Save("collection/"+collectionName, collectionData) + if err != nil { + return err + } + return nil } diff --git a/proxy/src/meta/etcd_watcher/Watcher.cpp b/proxy/src/meta/etcd_watcher/Watcher.cpp index f292cdf211..eccfebf7bc 100644 --- a/proxy/src/meta/etcd_watcher/Watcher.cpp +++ b/proxy/src/meta/etcd_watcher/Watcher.cpp @@ -21,6 +21,9 @@ Watcher::Watcher(const std::string &address, } void Watcher::Cancel() { + if (call_ == nullptr){ + return; + } call_->CancelWatch(); } diff --git a/proxy/src/server/MetaWrapper.cpp b/proxy/src/server/MetaWrapper.cpp index ad7c07fec9..da71af0a35 100644 --- a/proxy/src/server/MetaWrapper.cpp +++ b/proxy/src/server/MetaWrapper.cpp @@ -14,17 +14,23 @@ namespace milvus { namespace server { namespace { -void ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) { - auto json = JSON::parse(json_str); - segment_info.set_segment_id(json["segment_id"].get()); - segment_info.set_partition_tag(json["partition_tag"].get()); - segment_info.set_channel_start(json["channel_start"].get()); - segment_info.set_channel_end(json["channel_end"].get()); - segment_info.set_open_timestamp(json["open_timestamp"].get()); - segment_info.set_close_timestamp(json["close_timestamp"].get()); - segment_info.set_collection_id(json["collection_id"].get()); - segment_info.set_collection_name(json["collection_name"].get()); - segment_info.set_rows(json["rows"].get()); +Status ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) { + try { + auto json = JSON::parse(json_str); + segment_info.set_segment_id(json["segment_id"].get()); + segment_info.set_partition_tag(json["partition_tag"].get()); + segment_info.set_channel_start(json["channel_start"].get()); + segment_info.set_channel_end(json["channel_end"].get()); + segment_info.set_open_timestamp(json["open_timestamp"].get()); + segment_info.set_close_timestamp(json["close_timestamp"].get()); + segment_info.set_collection_id(json["collection_id"].get()); + segment_info.set_collection_name(json["collection_name"].get()); + segment_info.set_rows(json["rows"].get()); + return Status::OK(); + } + catch (const std::exception &e) { + return Status(DB_ERROR, e.what()); + } } void ParseCollectionSchema(const std::string &json_str, Collection &collection) { @@ -64,7 +70,7 @@ Status MetaWrapper::Init() { // init etcd watcher auto f = [&](const etcdserverpb::WatchResponse &res) { - UpdateMeta(res); + UpdateMeta(res); }; watcher_ = std::make_shared(etcd_addr, segment_path_, f, true); return SyncMeta(); @@ -87,10 +93,14 @@ void MetaWrapper::UpdateMeta(const etcdserverpb::WatchResponse &res) { if (event_key.rfind(segment_path_, 0) == 0) { // segment info SegmentInfo segment_info; - ParseSegmentInfo(event_value, segment_info); - std::unique_lock lock(mutex_); - segment_infos_[segment_info.segment_id()] = segment_info; - lock.unlock(); + auto status = ParseSegmentInfo(event_value, segment_info); + if (status.ok()) { + std::unique_lock lock(mutex_); + segment_infos_[segment_info.segment_id()] = segment_info; + lock.unlock(); + } else { + return; + } } else { // table scheme Collection collection; @@ -152,10 +162,14 @@ Status MetaWrapper::SyncMeta() { } else { assert(IsSegmentMetaKey(kv.key())); SegmentInfo segment_info; - ParseSegmentInfo(kv.value(), segment_info); - std::unique_lock lock(mutex_); - segment_infos_[segment_info.segment_id()] = segment_info; - lock.unlock(); + status = ParseSegmentInfo(kv.value(), segment_info); + if (status.ok()) { + std::unique_lock lock(mutex_); + segment_infos_[segment_info.segment_id()] = segment_info; + lock.unlock(); + } else { + return status; + } } } } diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index 27eddba91d..74c9a78a54 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -242,6 +242,12 @@ Server::StartService() { goto FAIL; } + stat = MetaWrapper::GetInstance().Init(); + if (!stat.ok()) { + LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message(); + goto FAIL; + } + // Init pulsar message client stat = MessageWrapper::GetInstance().Init(); if (!stat.ok()) { @@ -249,12 +255,6 @@ Server::StartService() { goto FAIL; } - stat = MetaWrapper::GetInstance().Init(); - if (!stat.ok()) { - LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message(); - goto FAIL; - } - grpc::GrpcServer::GetInstance().Start(); return Status::OK();