From 1decc1a4ca93790ab663ba956d4e544979abf3e2 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 27 Nov 2020 09:59:40 +0800 Subject: [PATCH] Add metaRootPath and kvRootPath Signed-off-by: bigsheeper --- configs/milvus.yaml | 2 ++ internal/master/collection_task_test.go | 3 ++- internal/master/grpc_service_test.go | 3 ++- internal/master/master.go | 4 ++-- internal/master/param_table.go | 28 ++++++++++++++++++++----- internal/master/param_table_test.go | 12 ++++++++--- internal/master/partition_task_test.go | 3 ++- internal/master/segment_manager_test.go | 3 ++- internal/proxy/proxy_test.go | 4 ++-- internal/querynode/meta_service.go | 12 +++++------ internal/querynode/meta_service_test.go | 20 +++++++++--------- internal/querynode/param_table.go | 10 ++++++--- internal/querynode/param_table_test.go | 6 ++++++ 13 files changed, 75 insertions(+), 35 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c76cbd8ce7..b0eca24ad6 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -19,6 +19,8 @@ etcd: address: localhost port: 2379 rootPath: by-dev + metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath + kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath segThreshold: 10000 pulsar: diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index d1fa330aed..eb4920b00e 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -33,7 +33,8 @@ func TestMaster_CollectionTask(t *testing.T) { Port: Params.Port, EtcdAddress: Params.EtcdAddress, - EtcdRootPath: "/test/root", + MetaRootPath: "/test/root/meta", + KvRootPath: "/test/root/kv", PulsarAddress: Params.PulsarAddress, ProxyIDList: []typeutil.UniqueID{1, 2}, diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index a32e23efc8..644a84999c 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -32,7 +32,8 @@ func TestMaster_CreateCollection(t *testing.T) { Port: Params.Port, EtcdAddress: Params.EtcdAddress, - EtcdRootPath: "/test/root", + MetaRootPath: "/test/root/meta", + KvRootPath: "/test/root/kv", PulsarAddress: Params.PulsarAddress, ProxyIDList: []typeutil.UniqueID{1, 2}, diff --git a/internal/master/master.go b/internal/master/master.go index d88aba2327..8ea4586be6 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -82,8 +82,8 @@ func Init() { func CreateServer(ctx context.Context) (*Master, error) { //Init(etcdAddr, kvRootPath) etcdAddress := Params.EtcdAddress - metaRootPath := Params.EtcdRootPath - kvRootPath := Params.EtcdRootPath + metaRootPath := Params.MetaRootPath + kvRootPath := Params.KvRootPath pulsarAddr := Params.PulsarAddress etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) diff --git a/internal/master/param_table.go b/internal/master/param_table.go index 59eb732b0d..873c7b6af2 100644 --- a/internal/master/param_table.go +++ b/internal/master/param_table.go @@ -17,7 +17,8 @@ type ParamTable struct { Port int EtcdAddress string - EtcdRootPath string + MetaRootPath string + KvRootPath string PulsarAddress string // nodeID @@ -75,7 +76,8 @@ func (p *ParamTable) Init() { p.initPort() p.initEtcdAddress() - p.initEtcdRootPath() + p.initMetaRootPath() + p.initKvRootPath() p.initPulsarAddress() p.initProxyIDList() @@ -138,12 +140,28 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } -func (p *ParamTable) initEtcdRootPath() { - path, err := p.Load("etcd.rootpath") +func (p *ParamTable) initMetaRootPath() { + rootPath, err := p.Load("etcd.rootPath") if err != nil { panic(err) } - p.EtcdRootPath = path + subPath, err := p.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + p.MetaRootPath = rootPath + "/" + subPath +} + +func (p *ParamTable) initKvRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.kvSubPath") + if err != nil { + panic(err) + } + p.KvRootPath = rootPath + "/" + subPath } func (p *ParamTable) initTopicNum() { diff --git a/internal/master/param_table_test.go b/internal/master/param_table_test.go index 339ecfb6d4..ed83c3af02 100644 --- a/internal/master/param_table_test.go +++ b/internal/master/param_table_test.go @@ -22,10 +22,16 @@ func TestParamTable_Port(t *testing.T) { assert.Equal(t, port, 53100) } -func TestParamTable_EtcdRootPath(t *testing.T) { +func TestParamTable_MetaRootPath(t *testing.T) { Params.Init() - addr := Params.EtcdRootPath - assert.Equal(t, addr, "by-dev") + path := Params.MetaRootPath + assert.Equal(t, path, "by-dev/meta") +} + +func TestParamTable_KVRootPath(t *testing.T) { + Params.Init() + path := Params.KvRootPath + assert.Equal(t, path, "by-dev/kv") } func TestParamTable_TopicNum(t *testing.T) { diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 53b698e64a..c6642cad27 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -35,7 +35,8 @@ func TestMaster_Partition(t *testing.T) { Port: Params.Port, EtcdAddress: Params.EtcdAddress, - EtcdRootPath: "/test/root", + MetaRootPath: "/test/root/meta", + KvRootPath: "/test/root/kv", PulsarAddress: Params.PulsarAddress, ProxyIDList: []typeutil.UniqueID{1, 2}, diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 85942bdd62..0287ea955c 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -236,7 +236,8 @@ func startupMaster() { Port: Params.Port, EtcdAddress: Params.EtcdAddress, - EtcdRootPath: rootPath, + MetaRootPath: "/test/root/meta", + KvRootPath: "/test/root/kv", PulsarAddress: Params.PulsarAddress, ProxyIDList: []typeutil.UniqueID{1, 2}, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 4ffcaf18e0..9553cdff48 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -37,13 +37,13 @@ var testNum = 10 func startMaster(ctx context.Context) { master.Init() etcdAddr := master.Params.EtcdAddress - rootPath := master.Params.EtcdRootPath + metaRootPath := master.Params.MetaRootPath etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) if err != nil { panic(err) } - _, err = etcdCli.Delete(context.TODO(), rootPath, clientv3.WithPrefix()) + _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) if err != nil { panic(err) } diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index f95ec6776d..1a51a31891 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -31,7 +31,7 @@ type metaService struct { func newMetaService(ctx context.Context, replica *collectionReplica) *metaService { ETCDAddr := Params.etcdAddress() - ETCDRootPath := Params.etcdRootPath() + MetaRootPath := Params.metaRootPath() cli, _ := clientv3.New(clientv3.Config{ Endpoints: []string{ETCDAddr}, @@ -40,7 +40,7 @@ func newMetaService(ctx context.Context, replica *collectionReplica) *metaServic return &metaService{ ctx: ctx, - kvBase: kv.NewEtcdKV(cli, ETCDRootPath), + kvBase: kv.NewEtcdKV(cli, MetaRootPath), replica: replica, } } @@ -71,21 +71,21 @@ func (mService *metaService) start() { } func GetCollectionObjID(key string) string { - ETCDRootPath := Params.etcdRootPath() + ETCDRootPath := Params.metaRootPath() prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" return strings.TrimPrefix(key, prefix) } func GetSegmentObjID(key string) string { - ETCDRootPath := Params.etcdRootPath() + ETCDRootPath := Params.metaRootPath() prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" return strings.TrimPrefix(key, prefix) } func isCollectionObj(key string) bool { - ETCDRootPath := Params.etcdRootPath() + ETCDRootPath := Params.metaRootPath() prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" prefix = strings.TrimSpace(prefix) @@ -95,7 +95,7 @@ func isCollectionObj(key string) bool { } func isSegmentObj(key string) bool { - ETCDRootPath := Params.etcdRootPath() + ETCDRootPath := Params.metaRootPath() prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" prefix = strings.TrimSpace(prefix) diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go index 66c0c8e535..f85da1c953 100644 --- a/internal/querynode/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -64,24 +64,24 @@ func TestMetaService_getSegmentObjId(t *testing.T) { } func TestMetaService_isCollectionObj(t *testing.T) { - var key = "by-dev/collection/collection0" + var key = "by-dev/meta/collection/collection0" var b1 = isCollectionObj(key) assert.Equal(t, b1, true) - key = "by-dev/segment/segment0" + key = "by-dev/meta/segment/segment0" var b2 = isCollectionObj(key) assert.Equal(t, b2, false) } func TestMetaService_isSegmentObj(t *testing.T) { - var key = "by-dev/segment/segment0" + var key = "by-dev/meta/segment/segment0" var b1 = isSegmentObj(key) assert.Equal(t, b1, true) - key = "by-dev/collection/collection0" + key = "by-dev/meta/collection/collection0" var b2 = isSegmentObj(key) assert.Equal(t, b2, false) @@ -295,7 +295,7 @@ func TestMetaService_processCreate(t *testing.T) { node := NewQueryNode(ctx, 0) node.metaService = newMetaService(ctx, node.replica) - key1 := "by-dev/collection/0" + key1 := "by-dev/meta/collection/0" msg1 := `schema: < name: "test" fields: < @@ -327,7 +327,7 @@ func TestMetaService_processCreate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - key2 := "by-dev/segment/0" + key2 := "by-dev/meta/segment/0" msg2 := `partition_tag: "default" channel_start: 0 channel_end: 1 @@ -529,7 +529,7 @@ func TestMetaService_processModify(t *testing.T) { node := NewQueryNode(ctx, 0) node.metaService = newMetaService(ctx, node.replica) - key1 := "by-dev/collection/0" + key1 := "by-dev/meta/collection/0" msg1 := `schema: < name: "test" fields: < @@ -576,7 +576,7 @@ func TestMetaService_processModify(t *testing.T) { hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") assert.Equal(t, hasPartition, false) - key2 := "by-dev/segment/0" + key2 := "by-dev/meta/segment/0" msg2 := `partition_tag: "p1" channel_start: 0 channel_end: 1 @@ -772,7 +772,7 @@ func TestMetaService_processDelete(t *testing.T) { node := NewQueryNode(ctx, 0) node.metaService = newMetaService(ctx, node.replica) - key1 := "by-dev/collection/0" + key1 := "by-dev/meta/collection/0" msg1 := `schema: < name: "test" fields: < @@ -804,7 +804,7 @@ func TestMetaService_processDelete(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - key2 := "by-dev/segment/0" + key2 := "by-dev/meta/segment/0" msg2 := `partition_tag: "default" channel_start: 0 channel_end: 1 diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index be8bef8b65..026d116149 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -199,12 +199,16 @@ func (p *ParamTable) etcdAddress() string { return etcdAddress } -func (p *ParamTable) etcdRootPath() string { - etcdRootPath, err := p.Load("etcd.rootpath") +func (p *ParamTable) metaRootPath() string { + rootPath, err := p.Load("etcd.rootPath") if err != nil { panic(err) } - return etcdRootPath + subPath, err := p.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + return rootPath + "/" + subPath } func (p *ParamTable) gracefulTime() int64 { diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index e9405f8a84..544659134d 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -120,3 +120,9 @@ func TestParamTable_statsChannelName(t *testing.T) { name := Params.statsChannelName() assert.Equal(t, name, "query-node-stats") } + +func TestParamTable_metaRootPath(t *testing.T) { + Params.Init() + path := Params.metaRootPath() + assert.Equal(t, path, "by-dev/meta") +}