Remove common.yaml and component.yaml (#11661)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2021-11-12 21:25:08 +08:00 committed by GitHub
parent b3a3f4fdcb
commit 71e814f796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 184 additions and 444 deletions

View File

@ -34,6 +34,7 @@ type ParamTable struct {
IP string IP string
Port int Port int
Address string
// --- ETCD --- // --- ETCD ---
EtcdEndpoints []string EtcdEndpoints []string

View File

@ -208,7 +208,7 @@ func (s *Server) Register() error {
if s.session == nil { if s.session == nil {
return errors.New("failed to initialize session") return errors.New("failed to initialize session")
} }
s.session.Init(typeutil.DataCoordRole, Params.IP, true) s.session.Init(typeutil.DataCoordRole, Params.Address, true)
Params.NodeID = s.session.ServerID Params.NodeID = s.session.ServerID
Params.SetLogger(typeutil.UniqueID(-1)) Params.SetLogger(typeutil.UniqueID(-1))
return nil return nil

View File

@ -22,9 +22,9 @@ import (
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
) )
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by // ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
@ -34,7 +34,7 @@ type ParamTable struct {
IP string IP string
Port int Port int
RootCoordAddress string Address string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
@ -49,21 +49,26 @@ var once sync.Once
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initPort()
pt.initParams() pt.initParams()
pt.loadFromEnv() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}) })
} }
func (pt *ParamTable) initParams() { func (pt *ParamTable) initParams() {
pt.initRootCoordAddress() pt.loadFromEnv()
pt.initDataCoordAddress() pt.loadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
} }
func (pt *ParamTable) loadFromEnv() { func (pt *ParamTable) loadFromEnv() {
Params.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) loadFromArgs() {
} }
@ -71,22 +76,6 @@ func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("dataCoord.port") pt.Port = pt.ParseInt("dataCoord.port")
} }
func (pt *ParamTable) initRootCoordAddress() {
ret, err := pt.Load("_RootCoordAddress")
if err != nil {
panic(err)
}
pt.RootCoordAddress = ret
}
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
pt.IP = ret
}
func (pt *ParamTable) initServerMaxSendSize() { func (pt *ParamTable) initServerMaxSendSize() {
var err error var err error

View File

@ -32,9 +32,6 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.Port, 0) assert.NotEqual(t, Params.Port, 0)
t.Logf("DataCoord Port:%d", Params.Port) t.Logf("DataCoord Port:%d", Params.Port)
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))

View File

@ -84,6 +84,7 @@ func (s *Server) init() error {
datacoord.Params.InitOnce() datacoord.Params.InitOnce()
datacoord.Params.IP = Params.IP datacoord.Params.IP = Params.IP
datacoord.Params.Port = Params.Port datacoord.Params.Port = Params.Port
datacoord.Params.Address = Params.Address
err := s.dataCoord.Register() err := s.dataCoord.Register()
if err != nil { if err != nil {

View File

@ -78,6 +78,7 @@ func (c *Client) getGrpcClientFunc() (datapb.DataNodeClient, error) {
// if we return nil here, then we should check if client is nil outside, // if we return nil here, then we should check if client is nil outside,
err := c.connect(retry.Attempts(20)) err := c.connect(retry.Attempts(20))
if err != nil { if err != nil {
log.Debug("DatanodeClient try reconnect failed", zap.Error(err))
return nil, err return nil, err
} }

View File

@ -40,11 +40,9 @@ type ParamTable struct {
IP string IP string
Port int Port int
Address string
listener net.Listener listener net.Listener
RootCoordAddress string
DataCoordAddress string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
} }
@ -54,16 +52,25 @@ type ParamTable struct {
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initRootCoordAddress() pt.initParams()
pt.initDataCoordAddress() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initPort()
listener, err := net.Listen("tcp", pt.Address)
if err != nil {
panic(err)
}
pt.listener = listener
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.loadFromEnv() pt.loadFromEnv()
pt.loadFromArgs() pt.loadFromArgs()
pt.initPort()
pt.initServerMaxSendSize() pt.initServerMaxSendSize()
pt.initServerMaxRecvSize() pt.initServerMaxRecvSize()
})
} }
func (pt *ParamTable) loadFromArgs() { func (pt *ParamTable) loadFromArgs() {
@ -75,31 +82,12 @@ func (pt *ParamTable) loadFromEnv() {
} }
func (pt *ParamTable) initPort() { func (pt *ParamTable) initPort() {
port := pt.ParseInt("dataNode.port")
listener, err := net.Listen("tcp", ":0") pt.Port = port
if err != nil { if !funcutil.CheckPortAvailable(pt.Port) {
panic(err) pt.Port = funcutil.GetAvailablePort()
log.Warn("DataNode init", zap.Any("Port", pt.Port))
} }
pt.Port = listener.Addr().(*net.TCPAddr).Port
pt.listener = listener
log.Info("DataNode", zap.Int("port", pt.Port))
}
func (pt *ParamTable) initRootCoordAddress() {
ret, err := pt.Load("_RootCoordAddress")
if err != nil {
panic(err)
}
pt.RootCoordAddress = ret
}
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
pt.DataCoordAddress = ret
} }
func (pt *ParamTable) initServerMaxSendSize() { func (pt *ParamTable) initServerMaxSendSize() {

View File

@ -39,12 +39,6 @@ func TestParamTable(t *testing.T) {
assert.NotNil(t, Params.listener) assert.NotNil(t, Params.listener)
t.Logf("DataNode listener:%d", Params.listener) t.Logf("DataNode listener:%d", Params.listener)
assert.NotEqual(t, Params.DataCoordAddress, "")
t.Logf("DataCoordAddress:%s", Params.DataCoordAddress)
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))

View File

@ -191,7 +191,6 @@ func (s *Server) init() error {
// --- RootCoord Client --- // --- RootCoord Client ---
if s.newRootCoordClient != nil { if s.newRootCoordClient != nil {
log.Debug("RootCoord address", zap.String("address", Params.RootCoordAddress))
log.Debug("Init root coord client ...") log.Debug("Init root coord client ...")
rootCoordClient, err := s.newRootCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints) rootCoordClient, err := s.newRootCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -219,7 +218,6 @@ func (s *Server) init() error {
// --- Data Server Client --- // --- Data Server Client ---
if s.newDataCoordClient != nil { if s.newDataCoordClient != nil {
log.Debug("Data service address", zap.String("address", Params.DataCoordAddress))
log.Debug("DataNode Init data service client ...") log.Debug("DataNode Init data service client ...")
dataCoordClient, err := s.newDataCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints) dataCoordClient, err := s.newDataCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints)
if err != nil { if err != nil {

View File

@ -72,6 +72,7 @@ func (c *Client) getGrpcClient() (indexpb.IndexCoordClient, error) {
// if we return nil here, then we should check if client is nil outside, // if we return nil here, then we should check if client is nil outside,
err := c.connect(retry.Attempts(20)) err := c.connect(retry.Attempts(20))
if err != nil { if err != nil {
log.Debug("IndexcoordClient try reconnect failed", zap.Error(err))
return nil, err return nil, err
} }

View File

@ -22,17 +22,18 @@ import (
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
) )
// ParamTable is used to record configuration items. // ParamTable is used to record configuration items.
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
ServiceAddress string IP string
ServicePort int Port int
Address string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
@ -47,30 +48,38 @@ func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initParams() pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
}) })
} }
// initParams initializes params of the configuration items. // initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() { func (pt *ParamTable) initParams() {
pt.initServicePort() pt.LoadFromEnv()
pt.initServiceAddress() pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize() pt.initServerMaxSendSize()
pt.initServerMaxRecvSize() pt.initServerMaxRecvSize()
} }
// initServicePort initializes the port of IndexCoord service. // initServicePort initializes the port of IndexCoord service.
func (pt *ParamTable) initServicePort() { func (pt *ParamTable) initPort() {
pt.ServicePort = pt.ParseInt("indexCoord.port") pt.Port = pt.ParseInt("indexCoord.port")
} }
// initServiceAddress initializes the address of IndexCoord service. // initServiceAddress initializes the address of IndexCoord service.
func (pt *ParamTable) initServiceAddress() { func (pt *ParamTable) LoadFromEnv() {
ret, err := pt.Load("_IndexCoordAddress") Params.IP = funcutil.GetLocalIP()
if err != nil { }
panic(err)
} // LoadFromArgs is used to initialize configuration items from args.
pt.ServiceAddress = ret func (pt *ParamTable) loadFromArgs() {
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
} }
// initServerMaxSendSize initializes the max send size of IndexCoord service. // initServerMaxSendSize initializes the max send size of IndexCoord service.

View File

@ -76,8 +76,8 @@ func (s *Server) init() error {
Params.Init() Params.Init()
indexcoord.Params.InitOnce() indexcoord.Params.InitOnce()
indexcoord.Params.Address = Params.ServiceAddress indexcoord.Params.Address = Params.Address
indexcoord.Params.Port = Params.ServicePort indexcoord.Params.Port = Params.Port
closer := trace.InitTracing("IndexCoord") closer := trace.InitTracing("IndexCoord")
s.closer = closer s.closer = closer
@ -88,7 +88,7 @@ func (s *Server) init() error {
} }
s.loopWg.Add(1) s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort) go s.startGrpcLoop(indexcoord.Params.Port)
// wait for grpc IndexCoord loop start // wait for grpc IndexCoord loop start
if err := <-s.grpcErrChan; err != nil { if err := <-s.grpcErrChan; err != nil {
log.Error("IndexCoord", zap.Any("init error", err)) log.Error("IndexCoord", zap.Any("init error", err))
@ -181,7 +181,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done() defer s.loopWg.Done()
log.Debug("IndexCoord", zap.String("network address", Params.ServiceAddress), zap.Int("network port", grpcPort)) log.Debug("IndexCoord", zap.String("network address", Params.IP), zap.Int("network port", grpcPort))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil { if err != nil {
log.Warn("IndexCoord", zap.String("GrpcServer:failed to listen", err.Error())) log.Warn("IndexCoord", zap.String("GrpcServer:failed to listen", err.Error()))

View File

@ -75,6 +75,7 @@ func (c *Client) getGrpcClientFunc() (indexpb.IndexNodeClient, error) {
// if we return nil here, then we should check if client is nil outside, // if we return nil here, then we should check if client is nil outside,
err := c.connect(retry.Attempts(20)) err := c.connect(retry.Attempts(20))
if err != nil { if err != nil {
log.Debug("IndexNodeClient try reconnect failed", zap.Error(err))
return nil, err return nil, err
} }

View File

@ -32,8 +32,6 @@ import (
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
IndexCoordAddress string
IP string IP string
Port int Port int
Address string Address string
@ -51,16 +49,6 @@ func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initParams() pt.initParams()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode init", zap.Any("Port", pt.Port))
}
pt.LoadFromEnv()
pt.LoadFromArgs()
}) })
} }
@ -75,22 +63,20 @@ func (pt *ParamTable) LoadFromEnv() {
} }
func (pt *ParamTable) initParams() { func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort() pt.initPort()
pt.initIndexCoordAddress() pt.initServerMaxSendSize()
} pt.initServerMaxRecvSize()
// todo remove and use load from env
func (pt *ParamTable) initIndexCoordAddress() {
ret, err := pt.Load("_IndexCoordAddress")
if err != nil {
panic(err)
}
pt.IndexCoordAddress = ret
} }
func (pt *ParamTable) initPort() { func (pt *ParamTable) initPort() {
port := pt.ParseInt("indexNode.port") port := pt.ParseInt("indexNode.port")
pt.Port = port pt.Port = port
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode init", zap.Any("Port", pt.Port))
}
} }
func (pt *ParamTable) initServerMaxSendSize() { func (pt *ParamTable) initServerMaxSendSize() {

View File

@ -33,11 +33,6 @@ import (
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
RootCoordAddress string
IndexCoordAddress string
DataCoordAddress string
QueryCoordAddress string
IP string IP string
Port int Port int
Address string Address string
@ -56,13 +51,7 @@ func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initParams() pt.initParams()
pt.loadFromEnv()
pt.loadFromArgs()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}) })
} }
@ -71,51 +60,15 @@ func (pt *ParamTable) loadFromArgs() {
} }
func (pt *ParamTable) loadFromEnv() { func (pt *ParamTable) loadFromEnv() {
Params.IP = funcutil.GetLocalIP() pt.IP = funcutil.GetLocalIP()
} }
func (pt *ParamTable) initParams() { func (pt *ParamTable) initParams() {
pt.loadFromEnv()
pt.loadFromArgs()
pt.initPort() pt.initPort()
pt.initRootCoordAddress() pt.initServerMaxSendSize()
pt.initIndexCoordAddress() pt.initServerMaxRecvSize()
pt.initDataCoordAddress()
pt.initQueryCoordAddress()
}
// todo remove and use load from env
func (pt *ParamTable) initIndexCoordAddress() {
ret, err := pt.Load("_IndexCoordAddress")
if err != nil {
panic(err)
}
pt.IndexCoordAddress = ret
}
// todo remove and use load from env
func (pt *ParamTable) initRootCoordAddress() {
ret, err := pt.Load("_RootCoordAddress")
if err != nil {
panic(err)
}
pt.RootCoordAddress = ret
}
// todo remove and use load from env
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
pt.DataCoordAddress = ret
}
// todo remove and use load from env
func (pt *ParamTable) initQueryCoordAddress() {
ret, err := pt.Load("_QueryCoordAddress")
if err != nil {
panic(err)
}
pt.QueryCoordAddress = ret
} }
func (pt *ParamTable) initPort() { func (pt *ParamTable) initPort() {

View File

@ -148,8 +148,6 @@ func (s *Server) init() error {
proxy.Params.IP = Params.IP proxy.Params.IP = Params.IP
proxy.Params.NetworkAddress = Params.Address proxy.Params.NetworkAddress = Params.Address
// for purpose of ID Allocator
proxy.Params.RootCoordAddress = Params.RootCoordAddress
closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port)) closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer s.closer = closer
@ -173,9 +171,6 @@ func (s *Server) init() error {
return err return err
} }
rootCoordAddr := Params.RootCoordAddress
log.Debug("Proxy", zap.String("RootCoord address", rootCoordAddr))
if s.rootCoordClient == nil { if s.rootCoordClient == nil {
s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -196,9 +191,6 @@ func (s *Server) init() error {
s.proxy.SetRootCoordClient(s.rootCoordClient) s.proxy.SetRootCoordClient(s.rootCoordClient)
log.Debug("set rootcoord client ...") log.Debug("set rootcoord client ...")
dataCoordAddr := Params.DataCoordAddress
log.Debug("Proxy", zap.String("data coordinator address", dataCoordAddr))
if s.dataCoordClient == nil { if s.dataCoordClient == nil {
s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -215,9 +207,6 @@ func (s *Server) init() error {
s.proxy.SetDataCoordClient(s.dataCoordClient) s.proxy.SetDataCoordClient(s.dataCoordClient)
log.Debug("set data coordinator address ...") log.Debug("set data coordinator address ...")
indexCoordAddr := Params.IndexCoordAddress
log.Debug("Proxy", zap.String("index coordinator address", indexCoordAddr))
if s.indexCoordClient == nil { if s.indexCoordClient == nil {
s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -234,9 +223,6 @@ func (s *Server) init() error {
s.proxy.SetIndexCoordClient(s.indexCoordClient) s.proxy.SetIndexCoordClient(s.indexCoordClient)
log.Debug("set index coordinator client ...") log.Debug("set index coordinator client ...")
queryCoordAddr := Params.QueryCoordAddress
log.Debug("Proxy", zap.String("query coordinator address", queryCoordAddr))
if s.queryCooedClient == nil { if s.queryCooedClient == nil {
s.queryCooedClient, err = grpcquerycoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints) s.queryCooedClient, err = grpcquerycoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints)
if err != nil { if err != nil {

View File

@ -78,6 +78,7 @@ func (c *Client) getGrpcClientFunc() (querypb.QueryCoordClient, error) {
// if we return nil here, then we should check if client is nil outside, // if we return nil here, then we should check if client is nil outside,
err := c.connect(retry.Attempts(20)) err := c.connect(retry.Attempts(20))
if err != nil { if err != nil {
log.Warn("QueryCoordClient try connect fail", zap.Error(err))
return nil, err return nil, err
} }

View File

@ -22,9 +22,9 @@ import (
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
) )
// Params is a package scoped variable of type ParamTable. // Params is a package scoped variable of type ParamTable.
@ -35,12 +35,10 @@ var once sync.Once
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. // embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
IP string
Port int Port int
Address string
RootCoordAddress string
DataCoordAddress string
IndexCoordAddress string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
} }
@ -50,44 +48,33 @@ type ParamTable struct {
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initPort() pt.initParams()
pt.initRootCoordAddress() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initDataCoordAddress()
pt.initIndexCoordAddress()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}) })
} }
func (pt *ParamTable) initRootCoordAddress() { // initParams initializes params of the configuration items.
ret, err := pt.Load("_RootCoordAddress") func (pt *ParamTable) initParams() {
if err != nil { pt.LoadFromEnv()
panic(err) pt.LoadFromArgs()
} pt.initPort()
pt.RootCoordAddress = ret pt.initServerMaxSendSize()
} pt.initServerMaxRecvSize()
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
pt.DataCoordAddress = ret
}
func (pt *ParamTable) initIndexCoordAddress() {
ret, err := pt.Load("_IndexCoordAddress")
if err != nil {
panic(err)
}
pt.IndexCoordAddress = ret
} }
func (pt *ParamTable) initPort() { func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("queryCoord.port") pt.Port = pt.ParseInt("queryCoord.port")
} }
func (pt *ParamTable) LoadFromEnv() {
pt.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
func (pt *ParamTable) initServerMaxSendSize() { func (pt *ParamTable) initServerMaxSendSize() {
var err error var err error

View File

@ -29,12 +29,6 @@ import (
func TestParamTable(t *testing.T) { func TestParamTable(t *testing.T) {
Params.Init() Params.Init()
assert.NotEqual(t, Params.DataCoordAddress, "")
t.Logf("DataCoordAddress:%s", Params.DataCoordAddress)
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))

View File

@ -101,6 +101,7 @@ func (s *Server) init() error {
Params.Init() Params.Init()
qc.Params.InitOnce() qc.Params.InitOnce()
qc.Params.Address = Params.Address
qc.Params.Port = Params.Port qc.Params.Port = Params.Port
closer := trace.InitTracing("querycoord") closer := trace.InitTracing("querycoord")
@ -119,8 +120,6 @@ func (s *Server) init() error {
} }
// --- Master Server Client --- // --- Master Server Client ---
log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress))
if s.rootCoord == nil { if s.rootCoord == nil {
s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -152,8 +151,6 @@ func (s *Server) init() error {
log.Debug("QueryCoord report RootCoord ready") log.Debug("QueryCoord report RootCoord ready")
// --- Data service client --- // --- Data service client ---
log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress))
if s.dataCoord == nil { if s.dataCoord == nil {
s.dataCoord, err = dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) s.dataCoord, err = dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -182,7 +179,6 @@ func (s *Server) init() error {
log.Debug("QueryCoord report DataCoord ready") log.Debug("QueryCoord report DataCoord ready")
// --- IndexCoord --- // --- IndexCoord ---
log.Debug("QueryCoord try to new IndexCoord client", zap.Any("IndexCoordAddress", Params.IndexCoordAddress))
if s.indexCoord == nil { if s.indexCoord == nil {
s.indexCoord, err = isc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints) s.indexCoord, err = isc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
if err != nil { if err != nil {

View File

@ -37,15 +37,11 @@ var once sync.Once
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
QueryNodeIP string IP string
QueryNodePort int Port int
Address string
QueryNodeID UniqueID QueryNodeID UniqueID
RootCoordAddress string
IndexCoordAddress string
DataCoordAddress string
QueryCoordAddress string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
} }
@ -54,18 +50,18 @@ type ParamTable struct {
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
pt.BaseTable.Init() pt.BaseTable.Init()
pt.initPort() pt.initParams()
pt.initRootCoordAddress() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
pt.initIndexCoordAddress() })
pt.initDataCoordAddress() }
pt.initQueryCoordAddress()
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv() pt.LoadFromEnv()
pt.LoadFromArgs() pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize() pt.initServerMaxSendSize()
pt.initServerMaxRecvSize() pt.initServerMaxRecvSize()
})
} }
// LoadFromArgs is used to initialize configuration items from args. // LoadFromArgs is used to initialize configuration items from args.
@ -75,44 +71,16 @@ func (pt *ParamTable) LoadFromArgs() {
// LoadFromEnv is used to initialize configuration items from env. // LoadFromEnv is used to initialize configuration items from env.
func (pt *ParamTable) LoadFromEnv() { func (pt *ParamTable) LoadFromEnv() {
Params.QueryNodeIP = funcutil.GetLocalIP() pt.IP = funcutil.GetLocalIP()
}
func (pt *ParamTable) initRootCoordAddress() {
ret, err := pt.Load("_RootCoordAddress")
if err != nil {
panic(err)
}
pt.RootCoordAddress = ret
}
func (pt *ParamTable) initIndexCoordAddress() {
ret, err := pt.Load("_IndexCoordAddress")
if err != nil {
panic(err)
}
pt.IndexCoordAddress = ret
}
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
pt.DataCoordAddress = ret
}
func (pt *ParamTable) initQueryCoordAddress() {
ret, err := pt.Load("_QueryCoordAddress")
if err != nil {
panic(err)
}
pt.QueryCoordAddress = ret
} }
func (pt *ParamTable) initPort() { func (pt *ParamTable) initPort() {
port := pt.ParseInt("queryNode.port") port := pt.ParseInt("queryNode.port")
pt.QueryNodePort = port pt.Port = port
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("QueryNode init", zap.Any("Port", pt.Port))
}
} }
func (pt *ParamTable) initServerMaxSendSize() { func (pt *ParamTable) initServerMaxSendSize() {

View File

@ -30,18 +30,6 @@ import (
func TestParamTable(t *testing.T) { func TestParamTable(t *testing.T) {
Params.Init() Params.Init()
assert.NotEqual(t, Params.IndexCoordAddress, "")
t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress)
assert.NotEqual(t, Params.DataCoordAddress, "")
t.Logf("DataCoordAddress:%s", Params.DataCoordAddress)
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
assert.NotEqual(t, Params.QueryCoordAddress, "")
t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
@ -54,5 +42,5 @@ func TestParamTable(t *testing.T) {
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
Params.LoadFromEnv() Params.LoadFromEnv()
assert.Equal(t, Params.QueryNodeIP, funcutil.GetLocalIP()) assert.Equal(t, Params.IP, funcutil.GetLocalIP())
} }

View File

@ -84,16 +84,16 @@ func (s *Server) init() error {
Params.Init() Params.Init()
qn.Params.InitOnce() qn.Params.InitOnce()
qn.Params.QueryNodeIP = Params.QueryNodeIP qn.Params.QueryNodeIP = Params.IP
qn.Params.QueryNodePort = int64(Params.QueryNodePort) qn.Params.QueryNodePort = int64(Params.Port)
qn.Params.QueryNodeID = Params.QueryNodeID qn.Params.QueryNodeID = Params.QueryNodeID
closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer s.closer = closer
log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort)) log.Debug("QueryNode", zap.Int("port", Params.Port))
s.wg.Add(1) s.wg.Add(1)
go s.startGrpcLoop(Params.QueryNodePort) go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start // wait for grpc server loop start
err := <-s.grpcErrChan err := <-s.grpcErrChan
if err != nil { if err != nil {
@ -101,10 +101,6 @@ func (s *Server) init() error {
} }
// --- RootCoord Client --- // --- RootCoord Client ---
//ms.Params.Init()
addr := Params.RootCoordAddress
log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr))
if s.rootCoord == nil { if s.rootCoord == nil {
s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
if err != nil { if err != nil {
@ -135,7 +131,6 @@ func (s *Server) init() error {
} }
// --- IndexCoord --- // --- IndexCoord ---
log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress))
if s.indexCoord == nil { if s.indexCoord == nil {
s.indexCoord, err = isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) s.indexCoord, err = isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
if err != nil { if err != nil {

View File

@ -180,6 +180,7 @@ func (c *GrpcClient) getGrpcClientFunc() (rootcoordpb.RootCoordClient, error) {
// if we return nil here, then we should check if client is nil outside, // if we return nil here, then we should check if client is nil outside,
err := c.connect(retry.Attempts(20)) err := c.connect(retry.Attempts(20))
if err != nil { if err != nil {
log.Debug("RoodCoordClient try connect failed", zap.Error(err))
return nil, err return nil, err
} }

View File

@ -22,9 +22,9 @@ import (
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs" "github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
) )
// Params is a package scoped variable of type ParamTable. // Params is a package scoped variable of type ParamTable.
@ -36,12 +36,9 @@ var once sync.Once
type ParamTable struct { type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
Address string // ip:port IP string
Port int Port int
Address string
IndexCoordAddress string
QueryCoordAddress string
DataCoordAddress string
ServerMaxSendSize int ServerMaxSendSize int
ServerMaxRecvSize int ServerMaxRecvSize int
@ -49,62 +46,43 @@ type ParamTable struct {
// Init is an override method of BaseTable's Init. It mainly calls the // Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization. // Init of BaseTable and do some other initialization.
func (p *ParamTable) Init() { func (pt *ParamTable) Init() {
once.Do(func() { once.Do(func() {
p.BaseTable.Init() pt.BaseTable.Init()
p.initAddress() pt.initParams()
p.initPort() pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
p.initIndexCoordAddress()
p.initQueryCoordAddress()
p.initDataCoordAddress()
p.initServerMaxSendSize()
p.initServerMaxRecvSize()
}) })
} }
func (p *ParamTable) initAddress() { // initParams initializes params of the configuration items.
ret, err := p.Load("_RootCoordAddress") func (pt *ParamTable) initParams() {
if err != nil { pt.LoadFromEnv()
panic(err) pt.LoadFromArgs()
} pt.initPort()
p.Address = ret pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
} }
func (p *ParamTable) initPort() { // LoadFromEnv is used to initialize configuration items from env.
p.Port = p.ParseInt("rootCoord.port") func (pt *ParamTable) LoadFromEnv() {
pt.IP = funcutil.GetLocalIP()
} }
func (p *ParamTable) initIndexCoordAddress() { // LoadFromArgs is used to initialize configuration items from args.
ret, err := p.Load("_IndexCoordAddress") func (pt *ParamTable) LoadFromArgs() {
if err != nil {
panic(err)
}
p.IndexCoordAddress = ret
} }
func (p *ParamTable) initQueryCoordAddress() { func (pt *ParamTable) initPort() {
ret, err := p.Load("_QueryCoordAddress") pt.Port = pt.ParseInt("rootCoord.port")
if err != nil {
panic(err)
}
p.QueryCoordAddress = ret
} }
func (p *ParamTable) initDataCoordAddress() { func (pt *ParamTable) initServerMaxSendSize() {
ret, err := p.Load("_DataCoordAddress")
if err != nil {
panic(err)
}
p.DataCoordAddress = ret
}
func (p *ParamTable) initServerMaxSendSize() {
var err error var err error
valueStr, err := p.Load("rootCoord.grpc.serverMaxSendSize") valueStr, err := pt.Load("rootCoord.grpc.serverMaxSendSize")
if err != nil { // not set if err != nil { // not set
p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} }
value, err := strconv.Atoi(valueStr) value, err := strconv.Atoi(valueStr)
@ -113,21 +91,21 @@ func (p *ParamTable) initServerMaxSendSize() {
zap.String("rootCoord.grpc.serverMaxSendSize", valueStr), zap.String("rootCoord.grpc.serverMaxSendSize", valueStr),
zap.Error(err)) zap.Error(err))
p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else { } else {
p.ServerMaxSendSize = value pt.ServerMaxSendSize = value
} }
log.Debug("initServerMaxSendSize", log.Debug("initServerMaxSendSize",
zap.Int("rootCoord.grpc.serverMaxSendSize", p.ServerMaxSendSize)) zap.Int("rootCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
} }
func (p *ParamTable) initServerMaxRecvSize() { func (pt *ParamTable) initServerMaxRecvSize() {
var err error var err error
valueStr, err := p.Load("rootCoord.grpc.serverMaxRecvSize") valueStr, err := pt.Load("rootCoord.grpc.serverMaxRecvSize")
if err != nil { // not set if err != nil { // not set
p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} }
value, err := strconv.Atoi(valueStr) value, err := strconv.Atoi(valueStr)
@ -136,11 +114,11 @@ func (p *ParamTable) initServerMaxRecvSize() {
zap.String("rootCoord.grpc.serverMaxRecvSize", valueStr), zap.String("rootCoord.grpc.serverMaxRecvSize", valueStr),
zap.Error(err)) zap.Error(err))
p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else { } else {
p.ServerMaxRecvSize = value pt.ServerMaxRecvSize = value
} }
log.Debug("initServerMaxRecvSize", log.Debug("initServerMaxRecvSize",
zap.Int("rootCoord.grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) zap.Int("rootCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
} }

View File

@ -35,15 +35,6 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.Port, 0) assert.NotEqual(t, Params.Port, 0)
t.Logf("master port = %d", Params.Port) t.Logf("master port = %d", Params.Port)
assert.NotEqual(t, Params.IndexCoordAddress, "")
t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress)
assert.NotEqual(t, Params.DataCoordAddress, "")
t.Logf("DataCoordAddress:%s", Params.DataCoordAddress)
assert.NotEqual(t, Params.QueryCoordAddress, "")
t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))

View File

@ -46,7 +46,6 @@ type ParamTable struct {
EtcdEndpoints []string EtcdEndpoints []string
MetaRootPath string MetaRootPath string
RootCoordAddress string
PulsarAddress string PulsarAddress string
RocksmqPath string // not used in Proxy RocksmqPath string // not used in Proxy

View File

@ -93,7 +93,6 @@ func (p *ParamTable) InitOnce() {
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
p.BaseTable.Init() p.BaseTable.Init()
p.initQueryCoordAddress()
p.initRoleName() p.initRoleName()
// --- Channels --- // --- Channels ---
@ -131,14 +130,6 @@ func (p *ParamTable) Init() {
p.initMemoryUsageMaxDifferencePercentage() p.initMemoryUsageMaxDifferencePercentage()
} }
func (p *ParamTable) initQueryCoordAddress() {
url, err := p.Load("_QueryCoordAddress")
if err != nil {
panic(err)
}
p.Address = url
}
func (p *ParamTable) initClusterMsgChannelPrefix() { func (p *ParamTable) initClusterMsgChannelPrefix() {
config, err := p.Load("msgChannel.chanNamePrefix.cluster") config, err := p.Load("msgChannel.chanNamePrefix.cluster")
if err != nil { if err != nil {

View File

@ -159,7 +159,7 @@ func (qc *QueryCoord) Init() error {
qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager() qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
}) })
log.Debug("query coordinator init success")
return initError return initError
} }

View File

@ -179,62 +179,6 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
gp.Save("_RocksmqPath", rocksmqPath) gp.Save("_RocksmqPath", rocksmqPath)
rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS")
if rootCoordAddress == "" {
rootCoordHost, err := gp.Load("rootCoord.address")
if err != nil {
panic(err)
}
port, err := gp.Load("rootCoord.port")
if err != nil {
panic(err)
}
rootCoordAddress = rootCoordHost + ":" + port
}
gp.Save("_RootCoordAddress", rootCoordAddress)
indexCoordAddress := os.Getenv("INDEX_COORD_ADDRESS")
if indexCoordAddress == "" {
indexCoordHost, err := gp.Load("indexCoord.address")
if err != nil {
panic(err)
}
port, err := gp.Load("indexCoord.port")
if err != nil {
panic(err)
}
indexCoordAddress = indexCoordHost + ":" + port
}
gp.Save("_IndexCoordAddress", indexCoordAddress)
queryCoordAddress := os.Getenv("QUERY_COORD_ADDRESS")
if queryCoordAddress == "" {
serviceHost, err := gp.Load("queryCoord.address")
if err != nil {
panic(err)
}
port, err := gp.Load("queryCoord.port")
if err != nil {
panic(err)
}
queryCoordAddress = serviceHost + ":" + port
}
gp.Save("_QueryCoordAddress", queryCoordAddress)
dataCoordAddress := os.Getenv("DATA_COORD_ADDRESS")
if dataCoordAddress == "" {
serviceHost, err := gp.Load("dataCoord.address")
if err != nil {
panic(err)
}
port, err := gp.Load("dataCoord.port")
if err != nil {
panic(err)
}
dataCoordAddress = serviceHost + ":" + port
}
gp.Save("_DataCoordAddress", dataCoordAddress)
insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE")
if insertBufferFlushSize == "" { if insertBufferFlushSize == "" {
insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", "16777216") insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", "16777216")

View File

@ -182,7 +182,7 @@ func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error)
// it is false. Otherwise, set it to true. // it is false. Otherwise, set it to true.
func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) {
var ch <-chan *clientv3.LeaseKeepAliveResponse var ch <-chan *clientv3.LeaseKeepAliveResponse
log.Debug("Session Register Begin") log.Debug("Session Register Begin " + s.ServerName)
registerFn := func() error { registerFn := func() error {
resp, err := s.etcdCli.Grant(s.ctx, DefaultTTL) resp, err := s.etcdCli.Grant(s.ctx, DefaultTTL)
if err != nil { if err != nil {
@ -274,7 +274,6 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
log.Debug("SessionUtil GetSessions", zap.Any("prefix", prefix), zap.Any("resp", resp))
for _, kv := range resp.Kvs { for _, kv := range resp.Kvs {
session := &Session{} session := &Session{}
err = json.Unmarshal(kv.Value, session) err = json.Unmarshal(kv.Value, session)
@ -282,6 +281,9 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error)
return nil, 0, err return nil, 0, err
} }
_, mapKey := path.Split(string(kv.Key)) _, mapKey := path.Split(string(kv.Key))
log.Debug("SessionUtil GetSessions ", zap.Any("prefix", prefix),
zap.String("key", mapKey),
zap.Any("address", session.Address))
res[mapKey] = session res[mapKey] = session
} }
return res, resp.Header.Revision, nil return res, resp.Header.Revision, nil