From 6bc7e6d372d96e967c4add7df516a5c20b1dba85 Mon Sep 17 00:00:00 2001 From: rain Date: Fri, 18 Sep 2020 15:55:33 +0800 Subject: [PATCH] Update collection and segment id type and Refactor channel controller Signed-off-by: rain --- core/release-build.sh | 156 ----------------------------- go.mod | 1 + pkg/master/id/id.go | 43 ++++++++ pkg/master/mock/collection.go | 35 ++++--- pkg/master/mock/collection_test.go | 7 ++ pkg/master/mock/segment.go | 8 +- pkg/master/server.go | 69 ++++++++++--- 7 files changed, 128 insertions(+), 191 deletions(-) delete mode 100755 core/release-build.sh create mode 100644 pkg/master/id/id.go diff --git a/core/release-build.sh b/core/release-build.sh deleted file mode 100755 index f7fd8861ac..0000000000 --- a/core/release-build.sh +++ /dev/null @@ -1,156 +0,0 @@ -#!/bin/bash - -# Compile jobs variable; Usage: $ jobs=12 ./build.sh ... -if [[ ! ${jobs+1} ]]; then - jobs=$(nproc) -fi - -BUILD_OUTPUT_DIR="cmake-build-release" -BUILD_TYPE="Release" -BUILD_UNITTEST="OFF" -INSTALL_PREFIX=$(pwd)/milvus -MAKE_CLEAN="OFF" -BUILD_COVERAGE="OFF" -DB_PATH="/tmp/milvus" -PROFILING="OFF" -RUN_CPPLINT="OFF" -CUDA_COMPILER=/usr/local/cuda/bin/nvcc -GPU_VERSION="OFF" #defaults to CPU version -WITH_PROMETHEUS="ON" -CUDA_ARCH="DEFAULT" -CUSTOM_THIRDPARTY_PATH="" - -while getopts "p:d:t:s:f:ulrcghzme" arg; do - case $arg in - f) - CUSTOM_THIRDPARTY_PATH=$OPTARG - ;; - p) - INSTALL_PREFIX=$OPTARG - ;; - d) - DB_PATH=$OPTARG - ;; - t) - BUILD_TYPE=$OPTARG # BUILD_TYPE - ;; - u) - echo "Build and run unittest cases" - BUILD_UNITTEST="ON" - ;; - l) - RUN_CPPLINT="ON" - ;; - r) - if [[ -d ${BUILD_OUTPUT_DIR} ]]; then - MAKE_CLEAN="ON" - fi - ;; - c) - BUILD_COVERAGE="ON" - ;; - z) - PROFILING="ON" - ;; - g) - GPU_VERSION="ON" - ;; - e) - WITH_PROMETHEUS="OFF" - ;; - s) - CUDA_ARCH=$OPTARG - ;; - h) # help - echo " - -parameter: --f: custom paths of thirdparty downloaded files(default: NULL) --p: install prefix(default: $(pwd)/milvus) --d: db data path(default: /tmp/milvus) --t: build type(default: Debug) --u: building unit test options(default: OFF) --l: run cpplint, clang-format and clang-tidy(default: OFF) --r: remove previous build directory(default: OFF) --c: code coverage(default: OFF) --z: profiling(default: OFF) --g: build GPU version(default: OFF) --e: build without prometheus(default: OFF) --s: build with CUDA arch(default:DEFAULT), for example '-gencode=compute_61,code=sm_61;-gencode=compute_75,code=sm_75' --h: help - -usage: -./build.sh -p \${INSTALL_PREFIX} -t \${BUILD_TYPE} -s \${CUDA_ARCH} -f\${CUSTOM_THIRDPARTY_PATH} [-u] [-l] [-r] [-c] [-z] [-g] [-m] [-e] [-h] - " - exit 0 - ;; - ?) - echo "ERROR! unknown argument" - exit 1 - ;; - esac -done - -if [[ ! -d ${BUILD_OUTPUT_DIR} ]]; then - mkdir ${BUILD_OUTPUT_DIR} -fi - -cd ${BUILD_OUTPUT_DIR} - -# remove make cache since build.sh -l use default variables -# force update the variables each time -make rebuild_cache >/dev/null 2>&1 - - -if [[ ${MAKE_CLEAN} == "ON" ]]; then - echo "Runing make clean in ${BUILD_OUTPUT_DIR} ..." - make clean - exit 0 -fi - -CMAKE_CMD="cmake \ --DBUILD_UNIT_TEST=${BUILD_UNITTEST} \ --DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} --DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ --DOpenBLAS_SOURCE=AUTO \ --DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \ --DBUILD_COVERAGE=${BUILD_COVERAGE} \ --DMILVUS_DB_PATH=${DB_PATH} \ --DENABLE_CPU_PROFILING=${PROFILING} \ --DMILVUS_GPU_VERSION=${GPU_VERSION} \ --DMILVUS_WITH_PROMETHEUS=${WITH_PROMETHEUS} \ --DMILVUS_CUDA_ARCH=${CUDA_ARCH} \ --DCUSTOM_THIRDPARTY_DOWNLOAD_PATH=${CUSTOM_THIRDPARTY_PATH} \ -../" -echo ${CMAKE_CMD} -${CMAKE_CMD} - - -if [[ ${RUN_CPPLINT} == "ON" ]]; then - # cpplint check - make lint - if [ $? -ne 0 ]; then - echo "ERROR! cpplint check failed" - exit 1 - fi - echo "cpplint check passed!" - - # clang-format check - make check-clang-format - if [ $? -ne 0 ]; then - echo "ERROR! clang-format check failed" - exit 1 - fi - echo "clang-format check passed!" - - # clang-tidy check - make check-clang-tidy - if [ $? -ne 0 ]; then - echo "ERROR! clang-tidy check failed" - exit 1 - fi - echo "clang-tidy check passed!" -else - # compile and build - make -j ${jobs} install || exit 1 -fi diff --git a/go.mod b/go.mod index ac8aa8d127..253dded8b8 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/prometheus/client_golang v1.5.1 // indirect github.com/prometheus/common v0.10.0 // indirect github.com/prometheus/procfs v0.1.3 // indirect + github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.1 github.com/tikv/client-go v0.0.0-20200824032810-95774393107b diff --git a/pkg/master/id/id.go b/pkg/master/id/id.go new file mode 100644 index 0000000000..dfa88a73c2 --- /dev/null +++ b/pkg/master/id/id.go @@ -0,0 +1,43 @@ +package id + +import ( + "encoding/binary" + + "github.com/rs/xid" + + "github.com/czs007/suvlim/errors" +) + +type ID struct { + xid.ID +} + +func BytesToUint64(b []byte) (uint64, error) { + if len(b) != 12 { + return 0, errors.Errorf("invalid data, must 12 bytes, but %d", len(b)) + } + + return binary.BigEndian.Uint64(b), nil +} + +// Uint64ToBytes converts uint64 to a byte slice. +func Uint64ToBytes(v uint64) []byte { + b := make([]byte, 12) + binary.BigEndian.PutUint64(b, v) + return b +} + +func New() ID { + return ID{ + xid.New(), + } +} + +func (id ID) Uint64() uint64 { + b := id.Bytes() + if len(b) != 12 { + return 0 + } + return binary.BigEndian.Uint64(b) + +} diff --git a/pkg/master/mock/collection.go b/pkg/master/mock/collection.go index ea1a1e28c5..8f5b71dfbc 100644 --- a/pkg/master/mock/collection.go +++ b/pkg/master/mock/collection.go @@ -1,13 +1,11 @@ package mock import ( - "fmt" "time" pb "github.com/czs007/suvlim/pkg/master/grpc/master" messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" "github.com/golang/protobuf/proto" - "github.com/google/uuid" jsoniter "github.com/json-iterator/go" ) @@ -25,9 +23,9 @@ type Collection struct { } type FieldMeta struct { - FieldName string `json:"field_name"` - Type string `json:"type"` - DIM int64 `json:"dimension"` + FieldName string `json:"field_name"` + Type messagepb.DataType `json:"type"` + DIM int64 `json:"dimension"` } func GrpcMarshal(c *Collection) *Collection { @@ -37,6 +35,16 @@ func GrpcMarshal(c *Collection) *Collection { pbSchema := &messagepb.Schema{ FieldMetas: []*messagepb.FieldMeta{}, } + schemaSlice := []*messagepb.FieldMeta{} + for _, v := range c.Schema { + newpbMeta := &messagepb.FieldMeta{ + FieldName: v.FieldName, + Type: v.Type, + Dim: v.DIM, + } + schemaSlice = append(schemaSlice, newpbMeta) + } + pbSchema.FieldMetas = schemaSlice grpcCollection := &pb.Collection{ Id: c.ID, Name: c.Name, @@ -45,27 +53,24 @@ func GrpcMarshal(c *Collection) *Collection { SegmentIds: c.SegmentIDs, PartitionTags: c.PartitionTags, } - out, err := proto.Marshal(grpcCollection) - if err != nil { - fmt.Println(err) - } - c.GrpcMarshalString = string(out) + out := proto.MarshalTextString(grpcCollection) + c.GrpcMarshalString = out return c } -func NewCollection(id uuid.UUID, name string, createTime time.Time, - schema []*messagepb.FieldMeta, sIds []uuid.UUID, ptags []string) Collection { +func NewCollection(id uint64, name string, createTime time.Time, + schema []*messagepb.FieldMeta, sIds []uint64, ptags []string) Collection { segementIDs := []uint64{} newSchema := []FieldMeta{} for _, v := range schema { - newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type.String(), DIM: v.Dim}) + newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type, DIM: v.Dim}) } for _, sid := range sIds { - segementIDs = append(segementIDs, uint64(sid.ID())) + segementIDs = append(segementIDs, sid) } return Collection{ - ID: uint64(id.ID()), + ID: id, Name: name, CreateTime: uint64(createTime.Unix()), Schema: newSchema, diff --git a/pkg/master/mock/collection_test.go b/pkg/master/mock/collection_test.go index f40dc605ac..48177c1d25 100644 --- a/pkg/master/mock/collection_test.go +++ b/pkg/master/mock/collection_test.go @@ -6,11 +6,18 @@ import ( "time" ) +var s = FieldMeta{ + FieldName: "test-schema-1", + Type: 1, + DIM: int64(512), +} + var C = Collection{ ID: uint64(11111), Name: "test-collection", CreateTime: uint64(time.Now().Unix()), SegmentIDs: []uint64{uint64(10111)}, + Schema: []FieldMeta{s}, PartitionTags: []string{"default"}, } diff --git a/pkg/master/mock/segment.go b/pkg/master/mock/segment.go index 37efd08f17..4d011cfbc0 100644 --- a/pkg/master/mock/segment.go +++ b/pkg/master/mock/segment.go @@ -4,8 +4,6 @@ import ( "bytes" "encoding/gob" "time" - - "github.com/google/uuid" ) type SegmentStats struct { @@ -45,10 +43,10 @@ type Segment struct { CollectionName string `json:"collection_name"` } -func NewSegment(id uuid.UUID, collectioID uuid.UUID, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { +func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { return Segment{ - SegmentID: uint64(id.ID()), - CollectionID: uint64(id.ID()), + SegmentID: id, + CollectionID: collectioID, CollectionName: cName, PartitionTag: ptag, ChannelStart: chStart, diff --git a/pkg/master/server.go b/pkg/master/server.go index b482efb6c0..c563de14ba 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -11,10 +11,10 @@ import ( "github.com/czs007/suvlim/pkg/master/common" pb "github.com/czs007/suvlim/pkg/master/grpc/master" messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/id" "github.com/czs007/suvlim/pkg/master/informer" "github.com/czs007/suvlim/pkg/master/kv" "github.com/czs007/suvlim/pkg/master/mock" - "github.com/google/uuid" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -55,6 +55,7 @@ func SegmentStatsController() { func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { if int(ss.MemorySize) > common.SEGMENT_THRESHOLE*0.8 { + currentTime := time.Now() memRate := int(ss.MemoryRate) if memRate == 0 { memRate = 1 @@ -68,12 +69,35 @@ func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { if err != nil { return err } - seg.CloseTimeStamp = uint64(time.Now().Add(time.Duration(sec) * time.Second).Unix()) + seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) updateData, err := mock.Segment2JSON(*seg) if err != nil { return err } kvbase.Save(strconv.Itoa(int(ss.SegementID)), updateData) + //create new segment + newSegID := id.New().Uint64() + newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) + newSegData, err := mock.Segment2JSON(*&newSeg) + if err != nil { + return err + } + //save to kv store + kvbase.Save(strconv.Itoa(int(newSegID)), newSegData) + // update collection data + c, _ := kvbase.Load(strconv.Itoa(int(seg.CollectionID))) + collection, err := mock.JSON2Collection(c) + if err != nil { + return err + } + segIDs := collection.SegmentIDs + segIDs = append(segIDs, newSegID) + collection.SegmentIDs = segIDs + cData, err := mock.Collection2JSON(*collection) + if err != nil { + return err + } + kvbase.Save(strconv.Itoa(int(seg.CollectionID)), cData) } return nil } @@ -97,7 +121,7 @@ type GRPCMasterServer struct { } func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { - // ms.CreateRequest <- in + // ms.CreateRequest <- in2 fmt.Println("Handle a new create collection request") err := WriteCollection2Datastore(in) if err != nil { @@ -126,30 +150,41 @@ func CollectionController(ch chan *messagepb.Mapping) { defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) for collection := range ch { - sID := uuid.New() - cID := uuid.New() + sID := id.New().Uint64() + cID := id.New().Uint64() + s2ID := id.New().Uint64() fieldMetas := []*messagepb.FieldMeta{} if collection.Schema != nil { fieldMetas = collection.Schema.FieldMetas } c := mock.NewCollection(cID, collection.CollectionName, - time.Now(), fieldMetas, []uuid.UUID{sID}, + time.Now(), fieldMetas, []uint64{sID, s2ID}, []string{"default"}) cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0)) + s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) + s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) collectionData, _ := mock.Collection2JSON(*cm) segmentData, err := mock.Segment2JSON(s) if err != nil { log.Fatal(err) } - err = kvbase.Save("collection/"+cID.String(), collectionData) + s2Data, err := mock.Segment2JSON(s2) if err != nil { log.Fatal(err) } - err = kvbase.Save("segment/"+sID.String(), segmentData) + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) if err != nil { log.Fatal(err) } + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) + if err != nil { + log.Fatal(err) + } + err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data) + if err != nil { + log.Fatal(err) + } + } } @@ -160,29 +195,33 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { }) defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) - sID := uuid.New() - cID := uuid.New() + sID := id.New().Uint64() + cID := id.New().Uint64() fieldMetas := []*messagepb.FieldMeta{} if collection.Schema != nil { fieldMetas = collection.Schema.FieldMetas } c := mock.NewCollection(cID, collection.CollectionName, - time.Now(), fieldMetas, []uuid.UUID{sID}, + time.Now(), fieldMetas, []uint64{sID}, []string{"default"}) cm := mock.GrpcMarshal(&c) s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0)) - collectionData, _ := mock.Collection2JSON(*cm) + collectionData, err := mock.Collection2JSON(*cm) + if err != nil { + log.Fatal(err) + return err + } segmentData, err := mock.Segment2JSON(s) if err != nil { log.Fatal(err) return err } - err = kvbase.Save("collection/"+cID.String(), collectionData) + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) if err != nil { log.Fatal(err) return err } - err = kvbase.Save("segment/"+sID.String(), segmentData) + err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) if err != nil { log.Fatal(err) return err