From b74afd7a0e620e57bf12e798d430b271de7e978f Mon Sep 17 00:00:00 2001 From: godchen Date: Fri, 14 May 2021 10:05:18 +0800 Subject: [PATCH] Add service registration (#5189) Add service registration. Part of Issue #5174. Signed-off-by: godchen --- internal/datanode/data_node.go | 64 +++++++++++++++++++++ internal/dataservice/server.go | 57 +++++++++++++++++-- internal/indexnode/indexnode.go | 68 ++++++++++++++++++++++- internal/indexnode/paramtable.go | 25 +++++++++ internal/indexservice/indexservice.go | 61 ++++++++++++++++++-- internal/kv/etcd/etcd_kv.go | 25 +++++++++ internal/masterservice/master_service.go | 49 ++++++++++++++++ internal/proxynode/paramtable.go | 25 +++++++++ internal/proxynode/proxy_node.go | 71 +++++++++++++++++++++++- internal/querynode/meta_service.go | 2 +- internal/querynode/param_table.go | 10 ++-- internal/querynode/query_node.go | 66 ++++++++++++++++++++++ internal/queryservice/param_table.go | 28 ++++++++++ internal/queryservice/queryservice.go | 66 ++++++++++++++++++++++ 14 files changed, 598 insertions(+), 19 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index f681d03d3c..4735139c44 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -16,6 +16,7 @@ package datanode import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -25,10 +26,13 @@ import ( "go.uber.org/zap" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -73,6 +77,13 @@ type DataNode struct { flushChan chan<- *flushMsg replica Replica + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + closer io.Closer msFactory msgstream.Factory @@ -133,6 +144,30 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { // At last, data node initializes its `dataSyncService` and `metaService`. func (node *DataNode) Init() error { ctx := context.Background() + connectEtcdFn := func() error { + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath) + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + ch, err := node.registerService(fmt.Sprintf("datanode-%d", Params.NodeID), Params.IP) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() req := &datapb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ @@ -291,3 +326,32 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin Value: "", }, nil } + +func (node *DataNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := node.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + node.session.NodeName = nodeName + node.session.IP = ip + node.session.LeaseID = respID + + sessionJSON, err := json.Marshal(node.session) + if err != nil { + return nil, err + } + + err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := node.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index f25608f4d4..98db7f5d35 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -12,6 +12,7 @@ package dataservice import ( "context" + "encoding/json" "errors" "fmt" "math/rand" @@ -64,7 +65,12 @@ type Server struct { masterClient types.MasterService ttMsgStream msgstream.MsgStream k2sMsgStream msgstream.MsgStream - ddChannelMu struct { + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + ddChannelMu struct { sync.Mutex name string } @@ -104,6 +110,21 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) { } func (s *Server) Init() error { + if err := s.initMeta(); err != nil { + return err + } + + ch, err := s.registerService(fmt.Sprintf("dataservice-%d", Params.NodeID), "localhost:123456") + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() return nil } @@ -118,10 +139,6 @@ func (s *Server) Start() error { return err } - if err = s.initMeta(); err != nil { - return err - } - s.allocator = newAllocator(s.masterClient) s.statsHandler = newStatsHandler(s.meta) @@ -838,3 +855,33 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR resp.Infos = infos return resp, nil } + +func (s *Server) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := s.kvClient.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + s.session.NodeName = nodeName + s.session.IP = ip + s.session.LeaseID = respID + + sessionJSON, err := json.Marshal(s.session) + if err != nil { + return nil, err + } + + err = s.kvClient.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := s.kvClient.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil + +} diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 976d58958e..7894ce854b 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -13,7 +13,9 @@ package indexnode import ( "context" + "encoding/json" "errors" + "fmt" "io" "math/rand" "time" @@ -21,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -29,7 +32,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" ) const ( @@ -55,6 +60,13 @@ type IndexNode struct { startCallbacks []func() closeCallbacks []func() + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + closer io.Closer } @@ -76,8 +88,33 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) { func (i *IndexNode) Init() error { ctx := context.Background() - err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) + connectEtcdFn := func() error { + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + i.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath) + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + ch, err := i.registerService(fmt.Sprintf("indexnode-%d", Params.NodeID), Params.IP) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + + err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) if err != nil { return err } @@ -264,3 +301,32 @@ func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringR }, }, nil } + +func (i *IndexNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := i.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + i.session.NodeName = nodeName + i.session.IP = ip + i.session.LeaseID = respID + + sessionJSON, err := json.Marshal(i.session) + if err != nil { + return nil, err + } + + err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := i.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/indexnode/paramtable.go b/internal/indexnode/paramtable.go index a1bef65bd7..afe0631ed4 100644 --- a/internal/indexnode/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -42,6 +42,9 @@ type ParamTable struct { MasterAddress string + EtcdAddress string + MetaRootPath string + MinIOAddress string MinIOAccessKeyID string MinIOSecretAccessKey string @@ -68,6 +71,8 @@ func (pt *ParamTable) initParams() { pt.initMinIOSecretAccessKey() pt.initMinIOUseSSL() pt.initMinioBucketName() + pt.initEtcdAddress() + pt.initMetaRootPath() } func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error { @@ -154,6 +159,26 @@ func (pt *ParamTable) initMinIOUseSSL() { } } +func (pt *ParamTable) initEtcdAddress() { + addr, err := pt.Load("_EtcdAddress") + if err != nil { + panic(err) + } + pt.EtcdAddress = addr +} + +func (pt *ParamTable) initMetaRootPath() { + rootPath, err := pt.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := pt.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + pt.MetaRootPath = path.Join(rootPath, subPath) +} + func (pt *ParamTable) initMinioBucketName() { bucketName, err := pt.Load("minio.bucketName") if err != nil { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 0e8750abf8..d336c62dde 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -13,7 +13,9 @@ package indexservice import ( "context" + "encoding/json" "errors" + "fmt" "math/rand" "sync" "time" @@ -63,6 +65,13 @@ type IndexService struct { nodeLock sync.RWMutex + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() @@ -84,15 +93,14 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { } func (i *IndexService) Init() error { - etcdAddress := Params.EtcdAddress - log.Debug("indexservice", zap.String("etcd address", etcdAddress)) + log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress)) connectEtcdFn := func() error { - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { return err } - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - metakv, err := NewMetaTable(etcdKV) + i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + metakv, err := NewMetaTable(i.etcdKV) if err != nil { return err } @@ -104,9 +112,21 @@ func (i *IndexService) Init() error { return err } + ch, err := i.registerService("indexservice", Params.Address) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + //init idAllocator kvRootPath := Params.KvRootPath - i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid")) + i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, kvRootPath, "index_gid")) if err := i.idAllocator.Initialize(); err != nil { return err } @@ -416,3 +436,32 @@ func (i *IndexService) dropIndexLoop() { } } } + +func (i *IndexService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := i.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + i.session.NodeName = nodeName + i.session.IP = ip + i.session.LeaseID = respID + + sessionJSON, err := json.Marshal(i.session) + if err != nil { + return nil, err + } + + err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := i.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index bed5a4fe2e..9d8e579591 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -123,6 +123,15 @@ func (kv *EtcdKV) Save(key, value string) error { return err } +// SaveWithLease is a function to put value in etcd with etcd lease options. +func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + _, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id)) + return err +} + func (kv *EtcdKV) MultiSave(kvs map[string]string) error { ops := make([]clientv3.Op, 0, len(kvs)) for key, value := range kvs { @@ -228,3 +237,19 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() return err } + +// Grant creates a new lease implemented in etcd grant interface. +func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { + resp, err := kv.client.Grant(context.Background(), ttl) + return resp.ID, err +} + +// KeepAlive keeps the lease alive forever with leaseID. +// Implemented in etcd interface. +func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + ch, err := kv.client.KeepAlive(context.Background(), id) + if err != nil { + return nil, err + } + return ch, nil +} diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index b848172dfc..fc6bf595a0 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -13,6 +13,7 @@ package masterservice import ( "context" + "encoding/json" "fmt" "math/rand" "sync" @@ -138,6 +139,12 @@ type Core struct { //isInit atomic.Value msFactory ms.Factory + + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } } // --------------------- function -------------------------- @@ -789,6 +796,19 @@ func (c *Core) Init() error { return } + ch, err := c.registerService("masterservice", "localhost") + if err != nil { + return + } + + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) if initError = idAllocator.Initialize(); initError != nil { return @@ -1514,3 +1534,32 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste Count: in.Count, }, nil } + +func (c *Core) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := c.metaKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + c.session.NodeName = nodeName + c.session.IP = ip + c.session.LeaseID = respID + + sessionJSON, err := json.Marshal(c.session) + if err != nil { + return nil, err + } + + err = c.metaKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := c.metaKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index f201619b78..a5e9b497a5 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -42,6 +42,8 @@ type ParamTable struct { IP string NetworkAddress string + EtcdAddress string + MetaRootPath string MasterAddress string PulsarAddress string @@ -128,6 +130,9 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initLogCfg() + + pt.initEtcdAddress() + pt.initMetaRootPath() // err := pt.LoadYaml("advanced/proxy_node.yaml") // if err != nil { // panic(err) @@ -362,3 +367,23 @@ func (pt *ParamTable) initLogCfg() { func (pt *ParamTable) initRoleName() { pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID) } + +func (pt *ParamTable) initEtcdAddress() { + addr, err := pt.Load("_EtcdAddress") + if err != nil { + panic(err) + } + pt.EtcdAddress = addr +} + +func (pt *ParamTable) initMetaRootPath() { + rootPath, err := pt.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := pt.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + pt.MetaRootPath = path.Join(rootPath, subPath) +} diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 061982bccb..ec0225f1cd 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -13,7 +13,9 @@ package proxynode import ( "context" + "encoding/json" "errors" + "fmt" "math/rand" "sync" "sync/atomic" @@ -22,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -29,7 +32,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" ) type UniqueID = typeutil.UniqueID @@ -62,6 +67,13 @@ type ProxyNode struct { queryMsgStream msgstream.MsgStream msFactory msgstream.Factory + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() @@ -86,7 +98,35 @@ func (node *ProxyNode) Init() error { // todo wait for proxyservice state changed to Healthy ctx := context.Background() - err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200) + connectEtcdFn := func() error { + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + if err != nil { + return err + } + node.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + if err != nil { + return err + } + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + ch, err := node.registerService("proxynode", Params.NetworkAddress) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + + err = funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200) if err != nil { return err } @@ -296,3 +336,32 @@ func (node *ProxyNode) SetProxyServiceClient(cli types.ProxyService) { func (node *ProxyNode) SetQueryServiceClient(cli types.QueryService) { node.queryService = cli } + +func (node *ProxyNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := node.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + node.session.NodeName = nodeName + node.session.IP = ip + node.session.LeaseID = respID + + sessionJSON, err := json.Marshal(node.session) + if err != nil { + return nil, err + } + + err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := node.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index e8840a5bdf..e5e65b1461 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -42,7 +42,7 @@ type metaService struct { } func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService { - ETCDAddr := Params.ETCDAddress + ETCDAddr := Params.EtcdAddress MetaRootPath := Params.MetaRootPath var cli *clientv3.Client var err error diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index f142ced68f..38840f8fdf 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -26,7 +26,7 @@ type ParamTable struct { paramtable.BaseTable PulsarAddress string - ETCDAddress string + EtcdAddress string MetaRootPath string QueryNodeIP string @@ -100,7 +100,7 @@ func (p *ParamTable) Init() { p.initMinioBucketName() p.initPulsarAddress() - p.initETCDAddress() + p.initEtcdAddress() p.initMetaRootPath() p.initGracefulTime() @@ -227,12 +227,12 @@ func (p *ParamTable) initSearchResultReceiveBufSize() { p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") } -func (p *ParamTable) initETCDAddress() { - ETCDAddress, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdAddress() { + EtcdAddress, err := p.Load("_EtcdAddress") if err != nil { panic(err) } - p.ETCDAddress = ETCDAddress + p.EtcdAddress = EtcdAddress } func (p *ParamTable) initMetaRootPath() { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 5005c667dc..638adfaadb 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -26,6 +26,7 @@ import "C" import ( "context" + "encoding/json" "errors" "fmt" "math/rand" @@ -35,12 +36,15 @@ import ( "go.uber.org/zap" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" + "go.etcd.io/etcd/clientv3" ) type QueryNode struct { @@ -66,6 +70,13 @@ type QueryNode struct { indexService types.IndexService dataService types.DataService + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + msFactory msgstream.Factory scheduler *taskScheduler } @@ -115,6 +126,32 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer func (node *QueryNode) Init() error { ctx := context.Background() + + connectEtcdFn := func() error { + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath) + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + ch, err := node.registerService(fmt.Sprintf("querynode-%d", Params.QueryNodeID), Params.QueryNodeIP) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + C.SegcoreInit() registerReq := &queryPb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ @@ -279,3 +316,32 @@ func (node *QueryNode) removeDataSyncService(collectionID UniqueID) { defer node.dsServicesMu.Unlock() delete(node.dataSyncServices, collectionID) } + +func (node *QueryNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := node.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + node.session.NodeName = nodeName + node.session.IP = ip + node.session.LeaseID = respID + + sessionJSON, err := json.Marshal(node.session) + if err != nil { + return nil, err + } + + err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := node.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 7e8147b79d..71ff2f393b 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -44,6 +44,10 @@ type ParamTable struct { // search SearchChannelPrefix string SearchResultChannelPrefix string + + // --- ETCD --- + EtcdAddress string + MetaRootPath string } var Params ParamTable @@ -76,6 +80,10 @@ func (p *ParamTable) Init() { p.initRoleName() p.initSearchChannelPrefix() p.initSearchResultChannelPrefix() + + // --- ETCD --- + p.initEtcdAddress() + p.initMetaRootPath() }) } @@ -164,3 +172,23 @@ func (p *ParamTable) initSearchResultChannelPrefix() { p.SearchResultChannelPrefix = channelName } + +func (p *ParamTable) initEtcdAddress() { + addr, err := p.Load("_EtcdAddress") + if err != nil { + panic(err) + } + p.EtcdAddress = addr +} + +func (p *ParamTable) initMetaRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + p.MetaRootPath = path.Join(rootPath, subPath) +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 37556dd9d0..f1a4d34d77 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -13,16 +13,21 @@ package queryservice import ( "context" + "encoding/json" + "fmt" "math/rand" "sync" "sync/atomic" "time" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" ) type Timestamp = typeutil.Timestamp @@ -50,10 +55,42 @@ type QueryService struct { isInit atomic.Value enableGrpc bool + etcdKV *etcdkv.EtcdKV + session struct { + NodeName string + IP string + LeaseID clientv3.LeaseID + } + msFactory msgstream.Factory } func (qs *QueryService) Init() error { + connectEtcdFn := func() error { + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + qs.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath) + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + ch, err := qs.registerService(fmt.Sprintf("queryservice-%d", Params.QueryServiceID), Params.Address) + if err != nil { + return err + } + go func() { + for { + for range ch { + //TODO process lesase response + } + } + }() + return nil } @@ -105,3 +142,32 @@ func (qs *QueryService) SetMasterService(masterService types.MasterService) { func (qs *QueryService) SetDataService(dataService types.DataService) { qs.dataServiceClient = dataService } + +func (qs *QueryService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := qs.etcdKV.Grant(5) + if err != nil { + fmt.Printf("grant error %s\n", err) + return nil, err + } + qs.session.NodeName = nodeName + qs.session.IP = ip + qs.session.LeaseID = respID + + sessionJSON, err := json.Marshal(qs.session) + if err != nil { + return nil, err + } + + err = qs.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := qs.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +}