Add service registration (#5189)

Add service registration.
Part of Issue #5174.

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-05-14 10:05:18 +08:00 committed by GitHub
parent d35092cc09
commit b74afd7a0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 598 additions and 19 deletions

View File

@ -16,6 +16,7 @@ package datanode
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
@ -25,10 +26,13 @@ import (
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -73,6 +77,13 @@ type DataNode struct {
flushChan chan<- *flushMsg
replica Replica
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
closer io.Closer
msFactory msgstream.Factory
@ -133,6 +144,30 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
// At last, data node initializes its `dataSyncService` and `metaService`.
func (node *DataNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService(fmt.Sprintf("datanode-%d", Params.NodeID), Params.IP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
@ -291,3 +326,32 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
Value: "",
}, nil
}
func (node *DataNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -12,6 +12,7 @@ package dataservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -64,7 +65,12 @@ type Server struct {
masterClient types.MasterService
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
ddChannelMu struct {
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
ddChannelMu struct {
sync.Mutex
name string
}
@ -104,6 +110,21 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) {
}
func (s *Server) Init() error {
if err := s.initMeta(); err != nil {
return err
}
ch, err := s.registerService(fmt.Sprintf("dataservice-%d", Params.NodeID), "localhost:123456")
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
return nil
}
@ -118,10 +139,6 @@ func (s *Server) Start() error {
return err
}
if err = s.initMeta(); err != nil {
return err
}
s.allocator = newAllocator(s.masterClient)
s.statsHandler = newStatsHandler(s.meta)
@ -838,3 +855,33 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
resp.Infos = infos
return resp, nil
}
func (s *Server) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := s.kvClient.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
s.session.NodeName = nodeName
s.session.IP = ip
s.session.LeaseID = respID
sessionJSON, err := json.Marshal(s.session)
if err != nil {
return nil, err
}
err = s.kvClient.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := s.kvClient.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -13,7 +13,9 @@ package indexnode
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"time"
@ -21,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -29,7 +32,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
const (
@ -55,6 +60,13 @@ type IndexNode struct {
startCallbacks []func()
closeCallbacks []func()
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
closer io.Closer
}
@ -76,8 +88,33 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
func (i *IndexNode) Init() error {
ctx := context.Background()
err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
i.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := i.registerService(fmt.Sprintf("indexnode-%d", Params.NodeID), Params.IP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
if err != nil {
return err
}
@ -264,3 +301,32 @@ func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringR
},
}, nil
}
func (i *IndexNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := i.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
i.session.NodeName = nodeName
i.session.IP = ip
i.session.LeaseID = respID
sessionJSON, err := json.Marshal(i.session)
if err != nil {
return nil, err
}
err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := i.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -42,6 +42,9 @@ type ParamTable struct {
MasterAddress string
EtcdAddress string
MetaRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
@ -68,6 +71,8 @@ func (pt *ParamTable) initParams() {
pt.initMinIOSecretAccessKey()
pt.initMinIOUseSSL()
pt.initMinioBucketName()
pt.initEtcdAddress()
pt.initMetaRootPath()
}
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error {
@ -154,6 +159,26 @@ func (pt *ParamTable) initMinIOUseSSL() {
}
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {
panic(err)
}
pt.EtcdAddress = addr
}
func (pt *ParamTable) initMetaRootPath() {
rootPath, err := pt.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := pt.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
pt.MetaRootPath = path.Join(rootPath, subPath)
}
func (pt *ParamTable) initMinioBucketName() {
bucketName, err := pt.Load("minio.bucketName")
if err != nil {

View File

@ -13,7 +13,9 @@ package indexservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
@ -63,6 +65,13 @@ type IndexService struct {
nodeLock sync.RWMutex
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -84,15 +93,14 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
}
func (i *IndexService) Init() error {
etcdAddress := Params.EtcdAddress
log.Debug("indexservice", zap.String("etcd address", etcdAddress))
log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress))
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metakv, err := NewMetaTable(etcdKV)
i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metakv, err := NewMetaTable(i.etcdKV)
if err != nil {
return err
}
@ -104,9 +112,21 @@ func (i *IndexService) Init() error {
return err
}
ch, err := i.registerService("indexservice", Params.Address)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
//init idAllocator
kvRootPath := Params.KvRootPath
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid"))
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, kvRootPath, "index_gid"))
if err := i.idAllocator.Initialize(); err != nil {
return err
}
@ -416,3 +436,32 @@ func (i *IndexService) dropIndexLoop() {
}
}
}
func (i *IndexService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := i.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
i.session.NodeName = nodeName
i.session.IP = ip
i.session.LeaseID = respID
sessionJSON, err := json.Marshal(i.session)
if err != nil {
return nil, err
}
err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := i.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -123,6 +123,15 @@ func (kv *EtcdKV) Save(key, value string) error {
return err
}
// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
return err
}
func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
ops := make([]clientv3.Op, 0, len(kvs))
for key, value := range kvs {
@ -228,3 +237,19 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}
// Grant creates a new lease implemented in etcd grant interface.
func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
resp, err := kv.client.Grant(context.Background(), ttl)
return resp.ID, err
}
// KeepAlive keeps the lease alive forever with leaseID.
// Implemented in etcd interface.
func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
ch, err := kv.client.KeepAlive(context.Background(), id)
if err != nil {
return nil, err
}
return ch, nil
}

View File

@ -13,6 +13,7 @@ package masterservice
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sync"
@ -138,6 +139,12 @@ type Core struct {
//isInit atomic.Value
msFactory ms.Factory
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
}
// --------------------- function --------------------------
@ -789,6 +796,19 @@ func (c *Core) Init() error {
return
}
ch, err := c.registerService("masterservice", "localhost")
if err != nil {
return
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = idAllocator.Initialize(); initError != nil {
return
@ -1514,3 +1534,32 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste
Count: in.Count,
}, nil
}
func (c *Core) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := c.metaKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
c.session.NodeName = nodeName
c.session.IP = ip
c.session.LeaseID = respID
sessionJSON, err := json.Marshal(c.session)
if err != nil {
return nil, err
}
err = c.metaKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := c.metaKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -42,6 +42,8 @@ type ParamTable struct {
IP string
NetworkAddress string
EtcdAddress string
MetaRootPath string
MasterAddress string
PulsarAddress string
@ -128,6 +130,9 @@ func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initLogCfg()
pt.initEtcdAddress()
pt.initMetaRootPath()
// err := pt.LoadYaml("advanced/proxy_node.yaml")
// if err != nil {
// panic(err)
@ -362,3 +367,23 @@ func (pt *ParamTable) initLogCfg() {
func (pt *ParamTable) initRoleName() {
pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID)
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {
panic(err)
}
pt.EtcdAddress = addr
}
func (pt *ParamTable) initMetaRootPath() {
rootPath, err := pt.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := pt.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
pt.MetaRootPath = path.Join(rootPath, subPath)
}

View File

@ -13,7 +13,9 @@ package proxynode
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
@ -22,6 +24,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -29,7 +32,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
type UniqueID = typeutil.UniqueID
@ -62,6 +67,13 @@ type ProxyNode struct {
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -86,7 +98,35 @@ func (node *ProxyNode) Init() error {
// todo wait for proxyservice state changed to Healthy
ctx := context.Background()
err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200)
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
if err != nil {
return err
}
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService("proxynode", Params.NetworkAddress)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
err = funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200)
if err != nil {
return err
}
@ -296,3 +336,32 @@ func (node *ProxyNode) SetProxyServiceClient(cli types.ProxyService) {
func (node *ProxyNode) SetQueryServiceClient(cli types.QueryService) {
node.queryService = cli
}
func (node *ProxyNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -42,7 +42,7 @@ type metaService struct {
}
func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService {
ETCDAddr := Params.ETCDAddress
ETCDAddr := Params.EtcdAddress
MetaRootPath := Params.MetaRootPath
var cli *clientv3.Client
var err error

View File

@ -26,7 +26,7 @@ type ParamTable struct {
paramtable.BaseTable
PulsarAddress string
ETCDAddress string
EtcdAddress string
MetaRootPath string
QueryNodeIP string
@ -100,7 +100,7 @@ func (p *ParamTable) Init() {
p.initMinioBucketName()
p.initPulsarAddress()
p.initETCDAddress()
p.initEtcdAddress()
p.initMetaRootPath()
p.initGracefulTime()
@ -227,12 +227,12 @@ func (p *ParamTable) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize")
}
func (p *ParamTable) initETCDAddress() {
ETCDAddress, err := p.Load("_EtcdAddress")
func (p *ParamTable) initEtcdAddress() {
EtcdAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.ETCDAddress = ETCDAddress
p.EtcdAddress = EtcdAddress
}
func (p *ParamTable) initMetaRootPath() {

View File

@ -26,6 +26,7 @@ import "C"
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -35,12 +36,15 @@ import (
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/clientv3"
)
type QueryNode struct {
@ -66,6 +70,13 @@ type QueryNode struct {
indexService types.IndexService
dataService types.DataService
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
msFactory msgstream.Factory
scheduler *taskScheduler
}
@ -115,6 +126,32 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
func (node *QueryNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService(fmt.Sprintf("querynode-%d", Params.QueryNodeID), Params.QueryNodeIP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
C.SegcoreInit()
registerReq := &queryPb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
@ -279,3 +316,32 @@ func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
defer node.dsServicesMu.Unlock()
delete(node.dataSyncServices, collectionID)
}
func (node *QueryNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -44,6 +44,10 @@ type ParamTable struct {
// search
SearchChannelPrefix string
SearchResultChannelPrefix string
// --- ETCD ---
EtcdAddress string
MetaRootPath string
}
var Params ParamTable
@ -76,6 +80,10 @@ func (p *ParamTable) Init() {
p.initRoleName()
p.initSearchChannelPrefix()
p.initSearchResultChannelPrefix()
// --- ETCD ---
p.initEtcdAddress()
p.initMetaRootPath()
})
}
@ -164,3 +172,23 @@ func (p *ParamTable) initSearchResultChannelPrefix() {
p.SearchResultChannelPrefix = channelName
}
func (p *ParamTable) initEtcdAddress() {
addr, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.EtcdAddress = addr
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = path.Join(rootPath, subPath)
}

View File

@ -13,16 +13,21 @@ package queryservice
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
type Timestamp = typeutil.Timestamp
@ -50,10 +55,42 @@ type QueryService struct {
isInit atomic.Value
enableGrpc bool
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
msFactory msgstream.Factory
}
func (qs *QueryService) Init() error {
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
qs.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := qs.registerService(fmt.Sprintf("queryservice-%d", Params.QueryServiceID), Params.Address)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
return nil
}
@ -105,3 +142,32 @@ func (qs *QueryService) SetMasterService(masterService types.MasterService) {
func (qs *QueryService) SetDataService(dataService types.DataService) {
qs.dataServiceClient = dataService
}
func (qs *QueryService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := qs.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
qs.session.NodeName = nodeName
qs.session.IP = ip
qs.session.LeaseID = respID
sessionJSON, err := json.Marshal(qs.session)
if err != nil {
return nil, err
}
err = qs.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := qs.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}