From bb03b9bfde5dcb8708cdbc6395980cdf5e90cd97 Mon Sep 17 00:00:00 2001 From: rain Date: Tue, 22 Sep 2020 15:26:38 +0800 Subject: [PATCH] Read config from file in master Signed-off-by: rain --- conf/conf.go | 18 ++++++++++++------ conf/config.yaml | 9 +++++++-- pkg/master/README.md | 2 +- pkg/master/server.go | 22 +++++++++++----------- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/conf/conf.go b/conf/conf.go index f4f8eb179c..41f5cc17fe 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -1,18 +1,24 @@ package conf import ( - "github.com/czs007/suvlim/storage/pkg/types" - yaml "gopkg.in/yaml.v2" "io/ioutil" "path" "runtime" + + "github.com/czs007/suvlim/storage/pkg/types" + yaml "gopkg.in/yaml.v2" ) // yaml.MapSlice type MasterConfig struct { - Address string - Port int32 + PulsarURL string + PulsarMoniterInterval int32 + PulsarTopic string + EtcdRootPath string + SegmentThreshole float32 + DefaultGRPCPort string + EtcdEndPoints []string } type EtcdConfig struct { @@ -61,8 +67,8 @@ func init() { } func getCurrentFileDir() string { - _, fpath, _, _ := runtime.Caller(0) - return path.Dir(fpath) + _, fpath, _, _ := runtime.Caller(0) + return path.Dir(fpath) } func load_config() { diff --git a/conf/config.yaml b/conf/config.yaml index 505dbbb001..b7bd121951 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -10,8 +10,13 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. master: - address: localhost - port: 53100 + pulsarurl: "pulsar://localhost:6650" + pulsarmoniterinterval: 1 + pulsartopic: "monitor-topic" + etcdrootpath: "by-dev" + segmentthreshole: 10000 + defaultgrpcport: ":53100" + etcdendpoints: ["127.0.0.1:12379"] etcd: address: localhost diff --git a/pkg/master/README.md b/pkg/master/README.md index 30aca7743c..90493cef77 100644 --- a/pkg/master/README.md +++ b/pkg/master/README.md @@ -21,4 +21,4 @@ go run cmd/master.go ### example if master create a collection with uuid ```46e468ee-b34a-419d-85ed-80c56bfa4e90``` -the corresponding key in etcd is /$(ETCD_ROOT_PATH)/collection/46e468ee-b34a-419d-85ed-80c56bfa4e90 +the corresponding key in etcd is $(ETCD_ROOT_PATH)/collection/46e468ee-b34a-419d-85ed-80c56bfa4e90 diff --git a/pkg/master/server.go b/pkg/master/server.go index bd283d855a..820ddecd6c 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -8,7 +8,7 @@ import ( "strconv" "time" - "github.com/czs007/suvlim/pkg/master/common" + "github.com/czs007/suvlim/conf" pb "github.com/czs007/suvlim/pkg/master/grpc/master" "github.com/czs007/suvlim/pkg/master/grpc/message" messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" @@ -33,11 +33,11 @@ func Run() { func SegmentStatsController() { cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{"127.0.0.1:12379"}, + Endpoints: conf.Config.Master.EtcdEndPoints, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) ssChan := make(chan mock.SegmentStats, 10) defer close(ssChan) @@ -56,13 +56,13 @@ func SegmentStatsController() { } func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { - if int(ss.MemorySize) > common.SEGMENT_THRESHOLE*0.8 { + if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { currentTime := time.Now() memRate := int(ss.MemoryRate) if memRate == 0 { memRate = 1 } - sec := common.SEGMENT_THRESHOLE * 0.2 / memRate + sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) if err != nil { return err @@ -138,7 +138,7 @@ func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error { } func GRPCServer(ch chan *messagepb.Mapping) error { - lis, err := net.Listen("tcp", common.DEFAULT_GRPC_PORT) + lis, err := net.Listen("tcp", conf.Config.Master.DefaultGRPCPort) if err != nil { return err } @@ -194,11 +194,11 @@ func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexP func CollectionController(ch chan *messagepb.Mapping) { cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{"127.0.0.1:12379"}, + Endpoints: conf.Config.Master.EtcdEndPoints, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) for collection := range ch { sID := id.New().Uint64() cID := id.New().Uint64() @@ -243,7 +243,7 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) sID := id.New().Uint64() cID := id.New().Uint64() fieldMetas := []*messagepb.FieldMeta{} @@ -281,11 +281,11 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { func UpdateCollectionIndex(index *messagepb.IndexParam) error { cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{"127.0.0.1:12379"}, + Endpoints: conf.Config.Master.EtcdEndPoints, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) collectionName := index.CollectionName c, err := kvbase.Load("collection/" + collectionName) if err != nil {