From cda47a925223d7e108486ba971e84a55811abe97 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Fri, 20 Nov 2020 09:13:29 +0800 Subject: [PATCH] Remove unused code Signed-off-by: neza2017 --- cmd/master/main.go | 41 ++++---- internal/master/collection/collection.go | 98 ------------------- internal/master/collection/collection_test.go | 38 ------- internal/master/collection_task.go | 5 +- internal/master/collection_task_test.go | 3 +- internal/master/controller/collection.go | 96 ------------------ internal/master/controller/segment.go | 67 ------------- internal/master/controller/segment_test.go | 43 -------- internal/master/{tso => }/global_allocator.go | 26 +---- internal/master/global_allocator_test.go | 86 ++++++++++++++++ internal/master/grpc_service.go | 68 +++++++------ internal/master/grpc_service_test.go | 3 +- internal/master/{id => }/id.go | 34 +------ internal/master/id/id_test.go | 46 --------- internal/master/informer/informer.go | 5 - internal/master/informer/pulsar.go | 30 ------ internal/master/master.go | 73 +++++++------- internal/master/meta_table.go | 4 - internal/master/meta_table_test.go | 9 +- .../master/{paramtable => }/paramtable.go | 2 +- internal/master/partition_task_test.go | 3 +- internal/master/scheduler.go | 14 +-- internal/master/segment/segment.go | 57 ----------- internal/master/segment_manager.go | 18 ++-- internal/master/segment_manager_test.go | 41 ++++---- internal/master/task.go | 7 +- .../{timesync => }/time_snyc_producer_test.go | 2 +- .../{timesync => }/time_sync_producer.go | 2 +- internal/master/{timesync => }/timesync.go | 20 ++-- internal/master/timesync/timetick.go | 26 ----- .../master/{timesync => }/timesync_test.go | 34 +++---- internal/master/{tso => }/tso.go | 2 +- internal/master/tso/global_allocator_test.go | 67 ------------- internal/proxy/proxy_test.go | 7 +- 34 files changed, 271 insertions(+), 806 deletions(-) delete mode 100644 internal/master/collection/collection.go delete mode 100644 internal/master/collection/collection_test.go delete mode 100644 internal/master/controller/collection.go delete mode 100644 internal/master/controller/segment.go delete mode 100644 internal/master/controller/segment_test.go rename internal/master/{tso => }/global_allocator.go (86%) create mode 100644 internal/master/global_allocator_test.go rename internal/master/{id => }/id.go (57%) delete mode 100644 internal/master/id/id_test.go delete mode 100644 internal/master/informer/informer.go delete mode 100644 internal/master/informer/pulsar.go rename internal/master/{paramtable => }/paramtable.go (99%) delete mode 100644 internal/master/segment/segment.go rename internal/master/{timesync => }/time_snyc_producer_test.go (99%) rename internal/master/{timesync => }/time_sync_producer.go (99%) rename internal/master/{timesync => }/timesync.go (92%) delete mode 100644 internal/master/timesync/timetick.go rename internal/master/{timesync => }/timesync_test.go (89%) rename internal/master/{tso => }/tso.go (99%) delete mode 100644 internal/master/tso/global_allocator_test.go diff --git a/cmd/master/main.go b/cmd/master/main.go index 6ae67b0d51..3bfbca3422 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -8,7 +8,6 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/internal/master" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "go.uber.org/zap" ) @@ -18,32 +17,32 @@ func main() { // Creates server. ctx, cancel := context.WithCancel(context.Background()) - etcdAddress, _ := masterParams.Params.EtcdAddress() - etcdRootPath, _ := masterParams.Params.EtcdRootPath() - pulsarAddr, _ := masterParams.Params.PulsarAddress() + etcdAddress, _ := master.Params.EtcdAddress() + etcdRootPath, _ := master.Params.EtcdRootPath() + pulsarAddr, _ := master.Params.PulsarAddress() pulsarAddr = "pulsar://" + pulsarAddr - defaultRecordSize := masterParams.Params.DefaultRecordSize() - minimumAssignSize := masterParams.Params.MinimumAssignSize() - segmentThreshold := masterParams.Params.SegmentThreshold() - segmentExpireDuration := masterParams.Params.SegmentExpireDuration() - numOfChannel, _ := masterParams.Params.TopicNum() - nodeNum, _ := masterParams.Params.QueryNodeNum() - statsChannel := masterParams.Params.StatsChannels() + defaultRecordSize := master.Params.DefaultRecordSize() + minimumAssignSize := master.Params.MinimumAssignSize() + segmentThreshold := master.Params.SegmentThreshold() + segmentExpireDuration := master.Params.SegmentExpireDuration() + numOfChannel, _ := master.Params.TopicNum() + nodeNum, _ := master.Params.QueryNodeNum() + statsChannel := master.Params.StatsChannels() opt := master.Option{ KVRootPath: etcdRootPath, MetaRootPath: etcdRootPath, EtcdAddr: []string{etcdAddress}, PulsarAddr: pulsarAddr, - ProxyIDs: masterParams.Params.ProxyIDList(), - PulsarProxyChannels: masterParams.Params.ProxyTimeSyncChannels(), - PulsarProxySubName: masterParams.Params.ProxyTimeSyncSubName(), - SoftTTBInterval: masterParams.Params.SoftTimeTickBarrierInterval(), - WriteIDs: masterParams.Params.WriteIDList(), - PulsarWriteChannels: masterParams.Params.WriteTimeSyncChannels(), - PulsarWriteSubName: masterParams.Params.WriteTimeSyncSubName(), - PulsarDMChannels: masterParams.Params.DMTimeSyncChannels(), - PulsarK2SChannels: masterParams.Params.K2STimeSyncChannels(), + ProxyIDs: master.Params.ProxyIDList(), + PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(), + PulsarProxySubName: master.Params.ProxyTimeSyncSubName(), + SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(), + WriteIDs: master.Params.WriteIDList(), + PulsarWriteChannels: master.Params.WriteTimeSyncChannels(), + PulsarWriteSubName: master.Params.WriteTimeSyncSubName(), + PulsarDMChannels: master.Params.DMTimeSyncChannels(), + PulsarK2SChannels: master.Params.K2STimeSyncChannels(), DefaultRecordSize: defaultRecordSize, MinimumAssignSize: minimumAssignSize, SegmentThreshold: segmentThreshold, @@ -71,7 +70,7 @@ func main() { cancel() }() - if err := svr.Run(int64(masterParams.Params.Port())); err != nil { + if err := svr.Run(int64(master.Params.Port())); err != nil { log.Fatal("run server failed", zap.Error(err)) } diff --git a/internal/master/collection/collection.go b/internal/master/collection/collection.go deleted file mode 100644 index b727ce39fc..0000000000 --- a/internal/master/collection/collection.go +++ /dev/null @@ -1,98 +0,0 @@ -package collection - -import ( - "time" - - "github.com/golang/protobuf/proto" - jsoniter "github.com/json-iterator/go" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -type UniqueID = typeutil.UniqueID -type Timestamp = typeutil.Timestamp - -type Collection struct { - ID UniqueID `json:"id"` - Name string `json:"name"` - CreateTime Timestamp `json:"creat_time"` - Schema []FieldMeta `json:"schema"` - // ExtraSchema []FieldMeta `json:"extra_schema"` - SegmentIDs []UniqueID `json:"segment_ids"` - PartitionTags []string `json:"partition_tags"` - GrpcMarshalString string `json:"grpc_marshal_string"` -} - -type FieldMeta struct { - FieldName string `json:"field_name"` - Type schemapb.DataType `json:"type"` - DIM int64 `json:"dimension"` -} - -func GrpcMarshal(c *Collection) *Collection { - if c.GrpcMarshalString != "" { - c.GrpcMarshalString = "" - } - pbSchema := &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{}, - } - schemaSlice := []*schemapb.FieldSchema{} - for _, v := range c.Schema { - newpbMeta := &schemapb.FieldSchema{ - Name: v.FieldName, - DataType: schemapb.DataType(v.Type), //czs_tag - } - schemaSlice = append(schemaSlice, newpbMeta) - } - pbSchema.Fields = schemaSlice - grpcCollection := &etcdpb.CollectionMeta{ - ID: c.ID, - Schema: pbSchema, - CreateTime: c.CreateTime, - SegmentIDs: c.SegmentIDs, - PartitionTags: c.PartitionTags, - } - out := proto.MarshalTextString(grpcCollection) - c.GrpcMarshalString = out - return c -} - -func NewCollection(id UniqueID, name string, createTime time.Time, - schema []*schemapb.FieldSchema, sIDs []UniqueID, ptags []string) Collection { - - segementIDs := []UniqueID{} - newSchema := []FieldMeta{} - for _, v := range schema { - newSchema = append(newSchema, FieldMeta{FieldName: v.Name, Type: v.DataType, DIM: 16}) - } - segementIDs = append(segementIDs, sIDs...) - return Collection{ - ID: id, - Name: name, - CreateTime: Timestamp(createTime.Unix()), - Schema: newSchema, - SegmentIDs: segementIDs, - PartitionTags: ptags, - } -} - -func Collection2JSON(c Collection) (string, error) { - b, err := json.Marshal(&c) - if err != nil { - return "", err - } - - return string(b), nil -} - -func JSON2Collection(s string) (*Collection, error) { - var c Collection - err := json.Unmarshal([]byte(s), &c) - if err != nil { - return &Collection{}, err - } - return &c, nil -} diff --git a/internal/master/collection/collection_test.go b/internal/master/collection/collection_test.go deleted file mode 100644 index 22c034a468..0000000000 --- a/internal/master/collection/collection_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package collection - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -var ( - cid = UniqueID(10011111234) - name = "test-segment" - createTime = time.Now() - schema = []*schemapb.FieldSchema{} - sIDs = []UniqueID{111111, 222222} - ptags = []string{"default", "test"} -) - -func TestNewCollection(t *testing.T) { - assert := assert.New(t) - c := NewCollection(cid, name, createTime, schema, sIDs, ptags) - assert.Equal(cid, c.ID) - assert.Equal(name, c.Name) - for k, v := range schema { - assert.Equal(v.Name, c.Schema[k].FieldName) - assert.Equal(v.DataType, c.Schema[k].Type) - } - assert.Equal(sIDs, c.SegmentIDs) - assert.Equal(ptags, c.PartitionTags) -} - -func TestGrpcMarshal(t *testing.T) { - assert := assert.New(t) - c := NewCollection(cid, name, createTime, schema, sIDs, ptags) - newc := GrpcMarshal(&c) - assert.NotEqual("", newc.GrpcMarshalString) -} diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index 54cf7c7ab8..1fb9549886 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -9,11 +9,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -type Timestamp = typeutil.Timestamp - type createCollectionTask struct { baseTask req *internalpb.CreateCollectionRequest @@ -69,7 +66,7 @@ func (t *createCollectionTask) Execute() error { return err } - collectionID, err := allocGlobalID() + collectionID, err := t.sch.globalIDAllocator() if err != nil { return err } diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index 2de216bba1..7e1864c617 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -7,7 +7,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -24,7 +23,7 @@ func TestMaster_CollectionTask(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr, _ := masterParams.Params.EtcdAddress() + etcdAddr, _ := Params.EtcdAddress() etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) diff --git a/internal/master/controller/collection.go b/internal/master/controller/collection.go deleted file mode 100644 index 2d52cf8ea5..0000000000 --- a/internal/master/controller/collection.go +++ /dev/null @@ -1,96 +0,0 @@ -package controller - -import ( - "log" - "strconv" - "time" - - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master/collection" - "github.com/zilliztech/milvus-distributed/internal/master/id" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/master/segment" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type UniqueID = typeutil.UniqueID - -func CollectionController(ch chan *schemapb.CollectionSchema, kvbase *kv.EtcdKV, errch chan error) { - for collectionMeta := range ch { - sID, _ := id.AllocOne() - cID, _ := id.AllocOne() - s2ID, _ := id.AllocOne() - fieldMetas := []*schemapb.FieldSchema{} - if collectionMeta.Fields != nil { - fieldMetas = collectionMeta.Fields - } - c := collection.NewCollection(cID, collectionMeta.Name, - time.Now(), fieldMetas, []UniqueID{sID, s2ID}, - []string{"default"}) - cm := collection.GrpcMarshal(&c) - s := segment.NewSegment(sID, cID, collectionMeta.Name, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) - s2 := segment.NewSegment(s2ID, cID, collectionMeta.Name, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) - collectionData, _ := collection.Collection2JSON(*cm) - segmentData, err := segment.Segment2JSON(s) - if err != nil { - log.Fatal(err) - } - s2Data, err := segment.Segment2JSON(s2) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("collection/"+strconv.FormatInt(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatInt(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - } - err = kvbase.Save("segment/"+strconv.FormatInt(s2ID, 10), s2Data) - if err != nil { - log.Fatal(err) - } - } -} - -func WriteCollection2Datastore(collectionMeta *schemapb.CollectionSchema, kvbase *kv.EtcdKV) error { - sID, _ := id.AllocOne() - cID, _ := id.AllocOne() - fieldMetas := []*schemapb.FieldSchema{} - if collectionMeta.Fields != nil { - fieldMetas = collectionMeta.Fields - } - c := collection.NewCollection(cID, collectionMeta.Name, - time.Now(), fieldMetas, []UniqueID{sID}, - []string{"default"}) - cm := collection.GrpcMarshal(&c) - topicNum, err := masterParams.Params.TopicNum() - if err != nil { - panic(err) - } - s := segment.NewSegment(sID, cID, collectionMeta.Name, "default", 0, topicNum, time.Now(), time.Unix(1<<46-1, 0)) - collectionData, err := collection.Collection2JSON(*cm) - if err != nil { - log.Fatal(err) - return err - } - segmentData, err := segment.Segment2JSON(s) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("collection/"+strconv.FormatInt(cID, 10), collectionData) - if err != nil { - log.Fatal(err) - return err - } - err = kvbase.Save("segment/"+strconv.FormatInt(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - return err - } - return nil - -} diff --git a/internal/master/controller/segment.go b/internal/master/controller/segment.go deleted file mode 100644 index be8c686e33..0000000000 --- a/internal/master/controller/segment.go +++ /dev/null @@ -1,67 +0,0 @@ -package controller - -import ( - "fmt" - "strconv" - "time" - - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master/collection" - "github.com/zilliztech/milvus-distributed/internal/master/id" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/master/segment" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -func ComputeCloseTime(ss internalpb.SegmentStats, kvbase *kv.EtcdKV) error { - masterParams.Params.InitParamTable() - segmentThreshold := masterParams.Params.SegmentThreshold() - if int(ss.MemorySize) > int(segmentThreshold*0.8) { - currentTime := time.Now() - //memRate := int(ss.MemoryRate) - memRate := 1 // to do - if memRate == 0 { - memRate = 1 - } - sec := int(segmentThreshold*0.2) / memRate - data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegmentID))) - if err != nil { - return err - } - seg, err := segment.JSON2Segment(data) - if err != nil { - return err - } - seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) - fmt.Println(seg) - updateData, err := segment.Segment2JSON(*seg) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(ss.SegmentID)), updateData) - //create new segment - newSegID, _ := id.AllocOne() - newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) - newSegData, err := segment.Segment2JSON(*&newSeg) - if err != nil { - return err - } - //save to kv store - kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) - // update collection data - c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) - collectionMeta, err := collection.JSON2Collection(c) - if err != nil { - return err - } - segIDs := collectionMeta.SegmentIDs - segIDs = append(segIDs, newSegID) - collectionMeta.SegmentIDs = segIDs - cData, err := collection.Collection2JSON(*collectionMeta) - if err != nil { - return err - } - kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) - } - return nil -} diff --git a/internal/master/controller/segment_test.go b/internal/master/controller/segment_test.go deleted file mode 100644 index 0431c7f111..0000000000 --- a/internal/master/controller/segment_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package controller - -import ( - "testing" - "time" - - "github.com/zilliztech/milvus-distributed/internal/kv" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "go.etcd.io/etcd/clientv3" -) - -func newKvBase() *kv.EtcdKV { - masterParams.Params.Init() - - etcdAddr, err := masterParams.Params.EtcdAddress() - if err != nil { - panic(err) - } - - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - etcdRootPath, err := masterParams.Params.EtcdRootPath() - if err != nil { - panic(err) - } - kvbase := kv.NewEtcdKV(cli, etcdRootPath) - return kvbase -} - -func TestComputeClosetTime(t *testing.T) { - kvbase := newKvBase() - var news internalpb.SegmentStats - for i := 0; i < 10; i++ { - news = internalpb.SegmentStats{ - SegmentID: UniqueID(6875940398055133887), - MemorySize: int64(i * 1000), - } - ComputeCloseTime(news, kvbase) - } -} diff --git a/internal/master/tso/global_allocator.go b/internal/master/global_allocator.go similarity index 86% rename from internal/master/tso/global_allocator.go rename to internal/master/global_allocator.go index 980af454f3..d58e60b5d7 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/global_allocator.go @@ -1,4 +1,4 @@ -package tso +package master import ( "log" @@ -35,17 +35,6 @@ type GlobalTSOAllocator struct { tso *timestampOracle } -var allocator *GlobalTSOAllocator - -func Init(etcdAddr []string, rootPath string) { - InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso")) -} - -func InitGlobalTsoAllocator(key string, base kv.Base) { - allocator = NewGlobalTSOAllocator(key, base) - allocator.Initialize() -} - // NewGlobalTSOAllocator creates a new global TSO allocator. func NewGlobalTSOAllocator(key string, kvBase kv.Base) *GlobalTSOAllocator { var saveInterval = 3 * time.Second @@ -127,16 +116,3 @@ func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { func (gta *GlobalTSOAllocator) Reset() { gta.tso.ResetTimestamp() } - -func AllocOne() (typeutil.Timestamp, error) { - return allocator.AllocOne() -} - -// Reset is used to reset the TSO allocator. -func Alloc(count uint32) (typeutil.Timestamp, error) { - return allocator.Alloc(count) -} - -func UpdateTSO() error { - return allocator.UpdateTSO() -} diff --git a/internal/master/global_allocator_test.go b/internal/master/global_allocator_test.go new file mode 100644 index 0000000000..840174a1e7 --- /dev/null +++ b/internal/master/global_allocator_test.go @@ -0,0 +1,86 @@ +package master + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +var gTestTsoAllocator Allocator +var gTestIDAllocator *GlobalIDAllocator + +func TestMain(m *testing.M) { + Params.Init() + + etcdAddr, err := Params.EtcdAddress() + if err != nil { + panic(err) + } + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) + gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) + exitCode := m.Run() + os.Exit(exitCode) +} + +func TestGlobalTSOAllocator_Initialize(t *testing.T) { + err := gTestTsoAllocator.Initialize() + assert.Nil(t, err) +} + +func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { + count := 1000 + perCount := uint32(100) + startTs, err := gTestTsoAllocator.GenerateTSO(perCount) + assert.Nil(t, err) + lastPhysical, lastLogical := tsoutil.ParseTS(startTs) + for i := 0; i < count; i++ { + ts, _ := gTestTsoAllocator.GenerateTSO(perCount) + physical, logical := tsoutil.ParseTS(ts) + if lastPhysical.Equal(physical) { + diff := logical - lastLogical + assert.Equal(t, uint64(perCount), diff) + } + lastPhysical, lastLogical = physical, logical + } +} + +func TestGlobalTSOAllocator_SetTSO(t *testing.T) { + curTime := time.Now() + nextTime := curTime.Add(2 * time.Second) + physical := nextTime.UnixNano() / int64(time.Millisecond) + logical := int64(0) + err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) + assert.Nil(t, err) +} + +func TestGlobalTSOAllocator_UpdateTSO(t *testing.T) { + err := gTestTsoAllocator.UpdateTSO() + assert.Nil(t, err) +} + +func TestGlobalTSOAllocator_Reset(t *testing.T) { + gTestTsoAllocator.Reset() +} + +func TestGlobalIdAllocator_Initialize(t *testing.T) { + err := gTestIDAllocator.Initialize() + assert.Nil(t, err) +} + +func TestGlobalIdAllocator_AllocOne(t *testing.T) { + one, err := gTestIDAllocator.AllocOne() + assert.Nil(t, err) + ano, err := gTestIDAllocator.AllocOne() + assert.Nil(t, err) + assert.NotEqual(t, one, ano) +} + +func TestGlobalIdAllocator_Alloc(t *testing.T) { + count := uint32(2 << 10) + idStart, idEnd, err := gTestIDAllocator.Alloc(count) + assert.Nil(t, err) + assert.Equal(t, count, uint32(idEnd-idStart)) +} diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index db552a4536..54b261d301 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" @@ -17,9 +15,9 @@ func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl var t task = &createCollectionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, } @@ -47,9 +45,9 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti var t task = &dropCollectionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, } @@ -77,9 +75,9 @@ func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollection var t task = &hasCollectionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, hasCollection: false, } @@ -114,9 +112,9 @@ func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.Describe var t task = &describeCollectionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, description: nil, } @@ -150,9 +148,9 @@ func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollect var t task = &showCollectionsTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, stringListResponse: nil, } @@ -188,9 +186,9 @@ func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreateParti var t task = &createPartitionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, } @@ -219,9 +217,9 @@ func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartition var t task = &dropPartitionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, } @@ -250,9 +248,9 @@ func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRe var t task = &hasPartitionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, hasPartition: false, } @@ -291,9 +289,9 @@ func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribeP var t task = &describePartitionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, description: nil, } @@ -329,9 +327,9 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio var t task = &showPartitionTask{ req: in, baseTask: baseTask{ - kvBase: s.kvBase, - mt: s.mt, - cv: make(chan error), + sch: s.scheduler, + mt: s.mt, + cv: make(chan error), }, stringListResponse: nil, } @@ -365,7 +363,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) { count := request.GetCount() - ts, err := tso.Alloc(count) + ts, err := s.tsoAllocator.Alloc(count) if err != nil { return &internalpb.TsoResponse{ @@ -384,7 +382,7 @@ func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*internalpb.IDResponse, error) { count := request.GetCount() - ts, err := id.AllocOne() + ts, err := s.idAllocator.AllocOne() if err != nil { return &internalpb.IDResponse{ @@ -408,7 +406,7 @@ func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.Assign Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, }, nil } - ts, err := tso.AllocOne() + ts, err := s.tsoAllocator.AllocOne() if err != nil { return &internalpb.AssignSegIDResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index a43b46ac88..215830c86c 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -6,7 +6,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -22,7 +21,7 @@ func TestMaster_CreateCollection(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } diff --git a/internal/master/id/id.go b/internal/master/id.go similarity index 57% rename from internal/master/id/id.go rename to internal/master/id.go index a0d7bda9ed..9eb306b6ac 100644 --- a/internal/master/id/id.go +++ b/internal/master/id.go @@ -1,33 +1,17 @@ -package id +package master import ( "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master/tso" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -type UniqueID = typeutil.UniqueID - // GlobalTSOAllocator is the global single point TSO allocator. type GlobalIDAllocator struct { - allocator tso.Allocator -} - -var allocator *GlobalIDAllocator - -func Init(etcdAddr []string, rootPath string) { - InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid")) -} - -func InitGlobalIDAllocator(key string, base kv.Base) { - allocator = NewGlobalIDAllocator(key, base) - allocator.Initialize() + allocator Allocator } func NewGlobalIDAllocator(key string, base kv.Base) *GlobalIDAllocator { return &GlobalIDAllocator{ - allocator: tso.NewGlobalTSOAllocator(key, base), + allocator: NewGlobalTSOAllocator(key, base), } } @@ -60,15 +44,3 @@ func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { func (gia *GlobalIDAllocator) UpdateID() error { return gia.allocator.UpdateTSO() } - -func AllocOne() (UniqueID, error) { - return allocator.AllocOne() -} - -func Alloc(count uint32) (UniqueID, UniqueID, error) { - return allocator.Alloc(count) -} - -func UpdateID() error { - return allocator.UpdateID() -} diff --git a/internal/master/id/id_test.go b/internal/master/id/id_test.go deleted file mode 100644 index 4c2a2f8c33..0000000000 --- a/internal/master/id/id_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package id - -import ( - "os" - "testing" - - "github.com/stretchr/testify/assert" - - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" -) - -var GIdAllocator *GlobalIDAllocator - -func TestMain(m *testing.M) { - masterParams.Params.Init() - - etcdAddr, err := masterParams.Params.EtcdAddress() - if err != nil { - panic(err) - } - GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) - - exitCode := m.Run() - os.Exit(exitCode) -} - -func TestGlobalIdAllocator_Initialize(t *testing.T) { - err := GIdAllocator.Initialize() - assert.Nil(t, err) -} - -func TestGlobalIdAllocator_AllocOne(t *testing.T) { - one, err := GIdAllocator.AllocOne() - assert.Nil(t, err) - ano, err := GIdAllocator.AllocOne() - assert.Nil(t, err) - assert.NotEqual(t, one, ano) -} - -func TestGlobalIdAllocator_Alloc(t *testing.T) { - count := uint32(2 << 10) - idStart, idEnd, err := GIdAllocator.Alloc(count) - assert.Nil(t, err) - assert.Equal(t, count, uint32(idEnd-idStart)) -} diff --git a/internal/master/informer/informer.go b/internal/master/informer/informer.go deleted file mode 100644 index 496a57b3cd..0000000000 --- a/internal/master/informer/informer.go +++ /dev/null @@ -1,5 +0,0 @@ -package informer - -type Informer interface { - Listener(key interface{}) (interface{}, error) -} diff --git a/internal/master/informer/pulsar.go b/internal/master/informer/pulsar.go deleted file mode 100644 index 8c039569b0..0000000000 --- a/internal/master/informer/pulsar.go +++ /dev/null @@ -1,30 +0,0 @@ -package informer - -import ( - "log" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" -) - -func NewPulsarClient() *PulsarClient { - pulsarAddress, _ := masterParams.Params.PulsarAddress() - pulsarAddress = "pulsar://" + pulsarAddress - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarAddress, - OperationTimeout: 30 * time.Second, - ConnectionTimeout: 30 * time.Second, - }) - if err != nil { - log.Fatalf("Could not instantiate Pulsar client: %v", err) - } - - return &PulsarClient{ - Client: client, - } -} - -type PulsarClient struct { - Client pulsar.Client -} diff --git a/internal/master/master.go b/internal/master/master.go index 5a02affbe0..9fdb164c4c 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -10,24 +10,24 @@ import ( "sync/atomic" "time" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/informer" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/master/timesync" - "github.com/zilliztech/milvus-distributed/internal/master/tso" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) // Server is the pd server. +type ( + UniqueID = typeutil.UniqueID + Timestamp = typeutil.Timestamp +) + type Option struct { KVRootPath string MetaRootPath string @@ -73,9 +73,6 @@ type Master struct { //grpc server grpcServer *grpc.Server - // pulsar client - pc *informer.PulsarClient - // chans ssChan chan internalpb.SegmentStats @@ -84,7 +81,7 @@ type Master struct { kvBase *kv.EtcdKV scheduler *ddRequestScheduler mt *metaTable - tsmp timesync.MsgProducer + tsmp *timeSyncMsgProducer // tso ticker tsTicker *time.Ticker @@ -95,6 +92,11 @@ type Master struct { segmentMgr *SegmentManager statsMs ms.MsgStream + + //id allocator + idAllocator *GlobalIDAllocator + //tso allocator + tsoAllocator *GlobalTSOAllocator } func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { @@ -108,18 +110,7 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { func Init() { rand.Seed(time.Now().UnixNano()) - masterParams.Params.InitParamTable() - etcdAddr, err := masterParams.Params.EtcdAddress() - if err != nil { - panic(err) - } - rootPath, err := masterParams.Params.EtcdRootPath() - if err != nil { - panic(err) - } - id.Init([]string{etcdAddr}, rootPath) - tso.Init([]string{etcdAddr}, rootPath) - + Params.InitParamTable() } // CreateServer creates the UNINITIALIZED pd server with given configuration. @@ -137,7 +128,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { } //timeSyncMsgProducer - tsmp, err := timesync.NewTimeSyncMsgProducer(ctx) + tsmp, err := NewTimeSyncMsgProducer(ctx) if err != nil { return nil, err } @@ -146,7 +137,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024) pulsarProxyStream.Start() var proxyStream ms.MsgStream = pulsarProxyStream - proxyTimeTickBarrier := timesync.NewSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval) + proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval) tsmp.SetProxyTtBarrier(proxyTimeTickBarrier) pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream @@ -154,7 +145,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024) pulsarWriteStream.Start() var writeStream ms.MsgStream = pulsarWriteStream - writeTimeTickBarrier := timesync.NewHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs) + writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs) tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier) pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream @@ -177,15 +168,31 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { ctx: ctx, startTimestamp: time.Now().Unix(), kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr), - scheduler: NewDDRequestScheduler(), mt: metakv, tsmp: tsmp, ssChan: make(chan internalpb.SegmentStats, 10), grpcErr: make(chan error), - pc: informer.NewPulsarClient(), - segmentMgr: NewSegmentManager(metakv, opt), statsMs: statsMs, } + + //init idAllocator + m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "gid")) + if err := m.idAllocator.Initialize(); err != nil { + return nil, err + } + + //init tsoAllocator + m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "tso")) + if err := m.tsoAllocator.Initialize(); err != nil { + return nil, err + } + + m.scheduler = NewDDRequestScheduler(func() (UniqueID, error) { return m.idAllocator.AllocOne() }) + m.segmentMgr = NewSegmentManager(metakv, opt, + func() (UniqueID, error) { return m.idAllocator.AllocOne() }, + func() (Timestamp, error) { return m.tsoAllocator.AllocOne() }, + ) + m.grpcServer = grpc.NewServer() masterpb.RegisterMasterServer(m.grpcServer, m) return m, nil @@ -333,18 +340,18 @@ func (s *Master) grpcLoop(grpcPort int64) { func (s *Master) tsLoop() { defer s.serverLoopWg.Done() - s.tsTicker = time.NewTicker(tso.UpdateTimestampStep) + s.tsTicker = time.NewTicker(UpdateTimestampStep) defer s.tsTicker.Stop() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() for { select { case <-s.tsTicker.C: - if err := tso.UpdateTSO(); err != nil { + if err := s.tsoAllocator.UpdateTSO(); err != nil { log.Println("failed to update timestamp", err) return } - if err := id.UpdateID(); err != nil { + if err := s.idAllocator.UpdateID(); err != nil { log.Println("failed to update id", err) return } diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 4e4cd06aae..16aa026fca 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -4,16 +4,12 @@ import ( "strconv" "sync" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" ) -type UniqueID = typeutil.UniqueID - type metaTable struct { client *kv.EtcdKV // client of a reliable kv service, i.e. etcd client tenantID2Meta map[UniqueID]pb.TenantMeta // tenant id to tenant meta diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 94eb77fe0b..0f977c20dc 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/kv" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "go.etcd.io/etcd/clientv3" @@ -16,7 +15,7 @@ import ( func TestMetaTable_Collection(t *testing.T) { Init() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } @@ -156,7 +155,7 @@ func TestMetaTable_Collection(t *testing.T) { func TestMetaTable_DeletePartition(t *testing.T) { Init() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } @@ -250,7 +249,7 @@ func TestMetaTable_DeletePartition(t *testing.T) { func TestMetaTable_Segment(t *testing.T) { Init() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } @@ -334,7 +333,7 @@ func TestMetaTable_Segment(t *testing.T) { func TestMetaTable_UpdateSegment(t *testing.T) { Init() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } diff --git a/internal/master/paramtable/paramtable.go b/internal/master/paramtable.go similarity index 99% rename from internal/master/paramtable/paramtable.go rename to internal/master/paramtable.go index 1d48a8fb4b..abc177abf9 100644 --- a/internal/master/paramtable/paramtable.go +++ b/internal/master/paramtable.go @@ -1,4 +1,4 @@ -package paramtable +package master import ( "log" diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index e0fbb1be55..4c30e90610 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -8,7 +8,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -25,7 +24,7 @@ func TestMaster_Partition(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr, err := masterParams.Params.EtcdAddress() + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } diff --git a/internal/master/scheduler.go b/internal/master/scheduler.go index a71fb14bcb..9d2611b5a6 100644 --- a/internal/master/scheduler.go +++ b/internal/master/scheduler.go @@ -1,19 +1,17 @@ package master -import ( - "github.com/zilliztech/milvus-distributed/internal/master/id" -) - type ddRequestScheduler struct { + globalIDAllocator func() (UniqueID, error) reqQueue chan task scheduleTimeStamp Timestamp } -func NewDDRequestScheduler() *ddRequestScheduler { +func NewDDRequestScheduler(allocGlobalID func() (UniqueID, error)) *ddRequestScheduler { const channelSize = 1024 rs := ddRequestScheduler{ - reqQueue: make(chan task, channelSize), + globalIDAllocator: allocGlobalID, + reqQueue: make(chan task, channelSize), } return &rs } @@ -22,7 +20,3 @@ func (rs *ddRequestScheduler) Enqueue(task task) error { rs.reqQueue <- task return nil } - -func allocGlobalID() (UniqueID, error) { - return id.AllocOne() -} diff --git a/internal/master/segment/segment.go b/internal/master/segment/segment.go deleted file mode 100644 index 0c0c641113..0000000000 --- a/internal/master/segment/segment.go +++ /dev/null @@ -1,57 +0,0 @@ -package segment - -import ( - "time" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - - jsoniter "github.com/json-iterator/go" -) - -type UniqueID = typeutil.UniqueID -type Timestamp = typeutil.Timestamp - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -type Segment struct { - SegmentID UniqueID `json:"segment_id"` - CollectionID UniqueID `json:"collection_id"` - PartitionTag string `json:"partition_tag"` - ChannelStart int `json:"channel_start"` - ChannelEnd int `json:"channel_end"` - OpenTimeStamp Timestamp `json:"open_timestamp"` - CloseTimeStamp Timestamp `json:"close_timestamp"` - CollectionName string `json:"collection_name"` - Rows int64 `json:"rows"` -} - -func NewSegment(id UniqueID, collectioID UniqueID, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { - return Segment{ - SegmentID: id, - CollectionID: collectioID, - CollectionName: cName, - PartitionTag: ptag, - ChannelStart: chStart, - ChannelEnd: chEnd, - OpenTimeStamp: Timestamp(openTime.Unix()), - CloseTimeStamp: Timestamp(closeTime.Unix()), - } -} - -func Segment2JSON(s Segment) (string, error) { - b, err := json.Marshal(&s) - if err != nil { - return "", err - } - - return string(b), nil -} - -func JSON2Segment(s string) (*Segment, error) { - var c Segment - err := json.Unmarshal([]byte(s), &c) - if err != nil { - return &Segment{}, err - } - return &c, nil -} diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index f9d57bb93d..4faf507cf0 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -5,8 +5,6 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -42,6 +40,8 @@ type SegmentManager struct { segmentExpireDuration int64 numOfChannels int numOfQueryNodes int + globalIDAllocator func() (UniqueID, error) + globalTSOAllocator func() (Timestamp, error) mu sync.RWMutex } @@ -97,7 +97,7 @@ func (segMgr *SegmentManager) closeSegment(segMeta *etcdpb.SegmentMeta) error { return nil } } - ts, err := tso.AllocOne() + ts, err := segMgr.globalTSOAllocator() if err != nil { return err } @@ -251,11 +251,11 @@ func (segMgr *SegmentManager) openNewSegment(channelID int32, collID UniqueID, p return nil, errors.Errorf("can't find the channel range which contains channel %d", channelID) } - newID, err := id.AllocOne() + newID, err := segMgr.globalIDAllocator() if err != nil { return nil, err } - openTime, err := tso.AllocOne() + openTime, err := segMgr.globalTSOAllocator() if err != nil { return nil, err } @@ -322,7 +322,11 @@ func (segMgr *SegmentManager) createChannelRanges() error { return nil } -func NewSegmentManager(meta *metaTable, opt *Option) *SegmentManager { +func NewSegmentManager(meta *metaTable, + opt *Option, + globalIDAllocator func() (UniqueID, error), + globalTSOAllocator func() (Timestamp, error), +) *SegmentManager { segMgr := &SegmentManager{ metaTable: meta, channelRanges: make([]*channelRange, 0), @@ -334,6 +338,8 @@ func NewSegmentManager(meta *metaTable, opt *Option) *SegmentManager { defaultSizePerRecord: opt.DefaultRecordSize, numOfChannels: opt.NumOfChannel, numOfQueryNodes: opt.NumOfQueryNode, + globalIDAllocator: globalIDAllocator, + globalTSOAllocator: globalTSOAllocator, } segMgr.createChannelRanges() return segMgr diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index e6a4f84769..546d6e98fa 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -2,19 +2,18 @@ package master import ( "log" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/master/id" - masterParam "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/msgstream" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "go.etcd.io/etcd/clientv3" ) @@ -26,17 +25,11 @@ var partitionTag = "test" var kvBase *kv.EtcdKV func setup() { - masterParam.Params.Init() - etcdAddress, err := masterParam.Params.EtcdAddress() + Params.Init() + etcdAddress, err := Params.EtcdAddress() if err != nil { panic(err) } - rootPath, err := masterParam.Params.EtcdRootPath() - if err != nil { - panic(err) - } - id.Init([]string{etcdAddress}, rootPath) - tso.Init([]string{etcdAddress}, rootPath) cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) if err != nil { @@ -76,7 +69,21 @@ func setup() { NumOfQueryNode: 3, NumOfChannel: 5, } - segMgr = NewSegmentManager(mt, opt) + + var cnt int64 + + segMgr = NewSegmentManager(mt, opt, + func() (UniqueID, error) { + val := atomic.AddInt64(&cnt, 1) + return val, nil + }, + func() (Timestamp, error) { + val := atomic.AddInt64(&cnt, 1) + phy := time.Now().UnixNano() / int64(time.Millisecond) + ts := tsoutil.ComposeTS(phy, val) + return ts, nil + }, + ) } func teardown() { @@ -137,13 +144,13 @@ func TestSegmentManager_AssignSegmentID(t *testing.T) { newReqs[0].Count = 1000000 _, err = segMgr.AssignSegmentID(newReqs) assert.Error(t, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold", - 1000000, masterParam.Params.DefaultRecordSize()*1000000), err) + 1000000, Params.DefaultRecordSize()*1000000), err) } func TestSegmentManager_SegmentStats(t *testing.T) { setup() defer teardown() - ts, err := tso.AllocOne() + ts, err := segMgr.globalTSOAllocator() assert.Nil(t, err) err = mt.AddSegment(&pb.SegmentMeta{ SegmentID: 100, @@ -158,7 +165,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) { MsgType: internalpb.MsgType_kQueryNodeSegStats, PeerID: 1, SegStats: []*internalpb.SegmentStats{ - {SegmentID: 100, MemorySize: 25000 * masterParam.Params.DefaultRecordSize(), NumRows: 25000, RecentlyModified: true}, + {SegmentID: 100, MemorySize: 25000 * Params.DefaultRecordSize(), NumRows: 25000, RecentlyModified: true}, }, } baseMsg := msgstream.BaseMsg{ @@ -182,12 +189,12 @@ func TestSegmentManager_SegmentStats(t *testing.T) { time.Sleep(1 * time.Second) segMeta, _ := mt.GetSegmentByID(100) assert.Equal(t, int64(100), segMeta.SegmentID) - assert.Equal(t, 25000*masterParam.Params.DefaultRecordSize(), segMeta.MemSize) + assert.Equal(t, 25000*Params.DefaultRecordSize(), segMeta.MemSize) assert.Equal(t, int64(25000), segMeta.NumRows) // close segment stats.SegStats[0].NumRows = 520000 - stats.SegStats[0].MemorySize = 520000 * masterParam.Params.DefaultRecordSize() + stats.SegStats[0].MemorySize = 520000 * Params.DefaultRecordSize() err = segMgr.HandleQueryNodeMsgPack(&msgPack) assert.Nil(t, err) time.Sleep(1 * time.Second) diff --git a/internal/master/task.go b/internal/master/task.go index 0c6bab5f4c..80eabe6ca1 100644 --- a/internal/master/task.go +++ b/internal/master/task.go @@ -4,16 +4,15 @@ import ( "context" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) // TODO: get timestamp from timestampOracle type baseTask struct { - kvBase *kv.EtcdKV - mt *metaTable - cv chan error + sch *ddRequestScheduler + mt *metaTable + cv chan error } type task interface { diff --git a/internal/master/timesync/time_snyc_producer_test.go b/internal/master/time_snyc_producer_test.go similarity index 99% rename from internal/master/timesync/time_snyc_producer_test.go rename to internal/master/time_snyc_producer_test.go index 2017e9f0ad..798a968e9f 100644 --- a/internal/master/timesync/time_snyc_producer_test.go +++ b/internal/master/time_snyc_producer_test.go @@ -1,4 +1,4 @@ -package timesync +package master import ( "context" diff --git a/internal/master/timesync/time_sync_producer.go b/internal/master/time_sync_producer.go similarity index 99% rename from internal/master/timesync/time_sync_producer.go rename to internal/master/time_sync_producer.go index 165a9740c9..fc58198858 100644 --- a/internal/master/timesync/time_sync_producer.go +++ b/internal/master/time_sync_producer.go @@ -1,4 +1,4 @@ -package timesync +package master import ( "context" diff --git a/internal/master/timesync/timesync.go b/internal/master/timesync.go similarity index 92% rename from internal/master/timesync/timesync.go rename to internal/master/timesync.go index b26d01ced3..3a883b32a1 100644 --- a/internal/master/timesync/timesync.go +++ b/internal/master/timesync.go @@ -1,4 +1,4 @@ -package timesync +package master import ( "context" @@ -10,6 +10,12 @@ import ( ) type ( + TimeTickBarrier interface { + GetTimeTick() (Timestamp, error) + Start() error + Close() + } + softTimeTickBarrier struct { peer2LastTt map[UniqueID]Timestamp minTtInterval Timestamp @@ -102,13 +108,13 @@ func (ttBarrier *softTimeTickBarrier) Start() error { return nil } -func NewSoftTimeTickBarrier(ctx context.Context, +func newSoftTimeTickBarrier(ctx context.Context, ttStream *ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { if len(peerIds) <= 0 { - log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is empty!\n") + log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n") return nil } @@ -124,7 +130,7 @@ func NewSoftTimeTickBarrier(ctx context.Context, sttbarrier.peer2LastTt[id] = Timestamp(0) } if len(peerIds) != len(sttbarrier.peer2LastTt) { - log.Printf("[NewSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n") + log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n") } return &sttbarrier @@ -228,12 +234,12 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { return tempMin } -func NewHardTimeTickBarrier(ctx context.Context, +func newHardTimeTickBarrier(ctx context.Context, ttStream *ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { if len(peerIds) <= 0 { - log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is empty!") + log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!") return nil } @@ -248,7 +254,7 @@ func NewHardTimeTickBarrier(ctx context.Context, sttbarrier.peer2Tt[id] = Timestamp(0) } if len(peerIds) != len(sttbarrier.peer2Tt) { - log.Printf("[NewSoftTimeTickBarrier] Warning: there are duplicate peerIds!") + log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!") } return &sttbarrier diff --git a/internal/master/timesync/timetick.go b/internal/master/timesync/timetick.go deleted file mode 100644 index 61e3db24ef..0000000000 --- a/internal/master/timesync/timetick.go +++ /dev/null @@ -1,26 +0,0 @@ -package timesync - -import ( - ms "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type ( - UniqueID = typeutil.UniqueID - Timestamp = typeutil.Timestamp -) - -type MsgProducer interface { - SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) - SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) - SetDMSyncStream(dmSync ms.MsgStream) - SetK2sSyncStream(k2sSync ms.MsgStream) - Start() error - Close() -} - -type TimeTickBarrier interface { - GetTimeTick() (Timestamp, error) - Start() error - Close() -} diff --git a/internal/master/timesync/timesync_test.go b/internal/master/timesync_test.go similarity index 89% rename from internal/master/timesync/timesync_test.go rename to internal/master/timesync_test.go index 7c595430ad..0795ed30d2 100644 --- a/internal/master/timesync/timesync_test.go +++ b/internal/master/timesync_test.go @@ -1,4 +1,4 @@ -package timesync +package master import ( "context" @@ -100,22 +100,22 @@ func TestTt_NewSoftTtBarrier(t *testing.T) { validPeerIds := []UniqueID{1, 2, 3} - sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) + sttbarrier := newSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) assert.NotNil(t, sttbarrier) sttbarrier.Close() validPeerIds2 := []UniqueID{1, 1, 1} - sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds2, minTtInterval) + sttbarrier = newSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds2, minTtInterval) assert.NotNil(t, sttbarrier) sttbarrier.Close() // invalid peerIds invalidPeerIds1 := make([]UniqueID, 0, 3) - sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1, minTtInterval) + sttbarrier = newSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1, minTtInterval) assert.Nil(t, sttbarrier) invalidPeerIds2 := []UniqueID{} - sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2, minTtInterval) + sttbarrier = newSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2, minTtInterval) assert.Nil(t, sttbarrier) } @@ -137,22 +137,22 @@ func TestTt_NewHardTtBarrier(t *testing.T) { validPeerIds := []UniqueID{1, 2, 3} - sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds) + sttbarrier := newHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds) assert.NotNil(t, sttbarrier) sttbarrier.Close() validPeerIds2 := []UniqueID{1, 1, 1} - sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds2) + sttbarrier = newHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds2) assert.NotNil(t, sttbarrier) sttbarrier.Close() // invalid peerIds invalidPeerIds1 := make([]UniqueID, 0, 3) - sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1) + sttbarrier = newHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1) assert.Nil(t, sttbarrier) invalidPeerIds2 := []UniqueID{} - sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2) + sttbarrier = newHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2) assert.Nil(t, sttbarrier) } @@ -175,7 +175,7 @@ func TestTt_SoftTtBarrierStart(t *testing.T) { minTtInterval := Timestamp(10) peerIds := []UniqueID{1, 2, 3} - sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, peerIds, minTtInterval) + sttbarrier := newSoftTimeTickBarrier(context.TODO(), ttStream, peerIds, minTtInterval) require.NotNil(t, sttbarrier) sttbarrier.Start() @@ -208,7 +208,7 @@ func TestTt_SoftTtBarrierGetTimeTickClose(t *testing.T) { minTtInterval := Timestamp(10) validPeerIds := []UniqueID{1, 2, 3} - sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) + sttbarrier := newSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) require.NotNil(t, sttbarrier) sttbarrier.Start() @@ -238,7 +238,7 @@ func TestTt_SoftTtBarrierGetTimeTickClose(t *testing.T) { minTtInterval = Timestamp(10) validPeerIds = []UniqueID{1, 2, 3} - sttbarrier01 := NewSoftTimeTickBarrier(context.TODO(), ttStream01, validPeerIds, minTtInterval) + sttbarrier01 := newSoftTimeTickBarrier(context.TODO(), ttStream01, validPeerIds, minTtInterval) require.NotNil(t, sttbarrier01) sttbarrier01.Start() @@ -276,7 +276,7 @@ func TestTt_SoftTtBarrierGetTimeTickCancel(t *testing.T) { validPeerIds := []UniqueID{1, 2, 3} ctx, cancel := context.WithCancel(context.Background()) - sttbarrier := NewSoftTimeTickBarrier(ctx, ttStream, validPeerIds, minTtInterval) + sttbarrier := newSoftTimeTickBarrier(ctx, ttStream, validPeerIds, minTtInterval) require.NotNil(t, sttbarrier) sttbarrier.Start() @@ -313,7 +313,7 @@ func TestTt_HardTtBarrierStart(t *testing.T) { }() peerIds := []UniqueID{1, 2, 3} - sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, peerIds) + sttbarrier := newHardTimeTickBarrier(context.TODO(), ttStream, peerIds) require.NotNil(t, sttbarrier) sttbarrier.Start() @@ -348,7 +348,7 @@ func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { }() peerIds := []UniqueID{1, 2, 3} - sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, peerIds) + sttbarrier := newHardTimeTickBarrier(context.TODO(), ttStream, peerIds) require.NotNil(t, sttbarrier) sttbarrier.Start() @@ -380,7 +380,7 @@ func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { }() peerIdsStuck := []UniqueID{1, 2, 3} - sttbarrierStuck := NewHardTimeTickBarrier(context.TODO(), ttStreamStuck, peerIdsStuck) + sttbarrierStuck := newHardTimeTickBarrier(context.TODO(), ttStreamStuck, peerIdsStuck) require.NotNil(t, sttbarrierStuck) sttbarrierStuck.Start() @@ -413,7 +413,7 @@ func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { peerIdsCancel := []UniqueID{1, 2, 3} ctx, cancel := context.WithCancel(context.Background()) - sttbarrierCancel := NewHardTimeTickBarrier(ctx, ttStreamCancel, peerIdsCancel) + sttbarrierCancel := newHardTimeTickBarrier(ctx, ttStreamCancel, peerIdsCancel) require.NotNil(t, sttbarrierCancel) sttbarrierCancel.Start() diff --git a/internal/master/tso/tso.go b/internal/master/tso.go similarity index 99% rename from internal/master/tso/tso.go rename to internal/master/tso.go index 02a99931bf..24c43ea01e 100644 --- a/internal/master/tso/tso.go +++ b/internal/master/tso.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tso +package master import ( "log" diff --git a/internal/master/tso/global_allocator_test.go b/internal/master/tso/global_allocator_test.go deleted file mode 100644 index 4516456517..0000000000 --- a/internal/master/tso/global_allocator_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package tso - -import ( - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" -) - -var GTsoAllocator Allocator - -func TestMain(m *testing.M) { - masterParams.Params.Init() - - etcdAddr, err := masterParams.Params.EtcdAddress() - if err != nil { - panic(err) - } - GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) - - exitCode := m.Run() - os.Exit(exitCode) -} - -func TestGlobalTSOAllocator_Initialize(t *testing.T) { - err := GTsoAllocator.Initialize() - assert.Nil(t, err) -} - -func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { - count := 1000 - perCount := uint32(100) - startTs, err := GTsoAllocator.GenerateTSO(perCount) - assert.Nil(t, err) - lastPhysical, lastLogical := tsoutil.ParseTS(startTs) - for i := 0; i < count; i++ { - ts, _ := GTsoAllocator.GenerateTSO(perCount) - physical, logical := tsoutil.ParseTS(ts) - if lastPhysical.Equal(physical) { - diff := logical - lastLogical - assert.Equal(t, uint64(perCount), diff) - } - lastPhysical, lastLogical = physical, logical - } -} - -func TestGlobalTSOAllocator_SetTSO(t *testing.T) { - curTime := time.Now() - nextTime := curTime.Add(2 * time.Second) - physical := nextTime.UnixNano() / int64(time.Millisecond) - logical := int64(0) - err := GTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) - assert.Nil(t, err) -} - -func TestGlobalTSOAllocator_UpdateTSO(t *testing.T) { - err := GTsoAllocator.UpdateTSO() - assert.Nil(t, err) -} - -func TestGlobalTSOAllocator_Reset(t *testing.T) { - GTsoAllocator.Reset() -} diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 8acd44c284..ffc7255357 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -15,7 +15,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/master" - masterParam "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -39,11 +38,11 @@ var testNum = 10 func startMaster(ctx context.Context) { master.Init() - etcdAddr, err := masterParam.Params.EtcdAddress() + etcdAddr, err := master.Params.EtcdAddress() if err != nil { panic(err) } - rootPath, err := masterParam.Params.EtcdRootPath() + rootPath, err := master.Params.EtcdRootPath() if err != nil { panic(err) } @@ -87,7 +86,7 @@ func startMaster(ctx context.Context) { if err != nil { log.Print("create server failed", zap.Error(err)) } - if err := svr.Run(int64(masterParam.Params.Port())); err != nil { + if err := svr.Run(int64(master.Params.Port())); err != nil { log.Fatal("run server failed", zap.Error(err)) }