Load grpc port of proxy from config

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2020-11-24 11:50:21 +08:00 committed by yefu.chen
parent 3ead70ac6e
commit ad0078c07a
10 changed files with 63 additions and 12 deletions

3
go.mod
View File

@ -12,7 +12,6 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.10.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/minio-go/v7 v7.0.5
@ -26,7 +25,7 @@ require (
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect
github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/common v0.10.0
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spaolacci/murmur3 v1.1.0

View File

@ -2,6 +2,7 @@ package proxy
import (
"log"
"net"
"os"
"strconv"
"strings"
@ -38,6 +39,25 @@ func (pt *ParamTable) Init() {
pt.Save("_proxyID", proxyIDStr)
}
func (pt *ParamTable) NetWorkAddress() string {
addr, err := pt.Load("proxy.network.address")
if err != nil {
panic(err)
}
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip proxy.network.address")
}
port, err := pt.Load("proxy.network.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
return addr + ":" + port
}
func (pt *ParamTable) MasterAddress() string {
ret, err := pt.Load("_MasterAddress")
if err != nil {

View File

@ -135,8 +135,7 @@ func (p *Proxy) AddCloseCallback(callbacks ...func()) {
func (p *Proxy) grpcLoop() {
defer p.proxyLoopWg.Done()
// TODO: use address in config instead
lis, err := net.Listen("tcp", ":5053")
lis, err := net.Listen("tcp", Params.NetWorkAddress())
if err != nil {
log.Fatalf("Proxy grpc server fatal error=%v", err)
}

View File

@ -6,6 +6,7 @@ import (
"log"
"os"
"strconv"
"strings"
"sync"
"testing"
@ -24,7 +25,6 @@ import (
var ctx context.Context
var cancel func()
var proxyAddress = "127.0.0.1:5053"
var proxyConn *grpc.ClientConn
var proxyClient servicepb.MilvusServiceClient
@ -81,8 +81,13 @@ func setup() {
startMaster(ctx)
startProxy(ctx)
proxyAddr := Params.NetWorkAddress()
addr := strings.Split(proxyAddr, ":")
if addr[0] == "0.0.0.0" {
proxyAddr = "127.0.0.1:" + addr[1]
}
conn, err := grpc.DialContext(ctx, proxyAddress, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.DialContext(ctx, proxyAddr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("Connect to proxy failed, error= %v", err)
}

View File

@ -52,6 +52,8 @@ type collectionReplica interface {
removeSegment(segmentID UniqueID) error
getSegmentByID(segmentID UniqueID) (*Segment, error)
hasSegment(segmentID UniqueID) bool
freeAll()
}
type collectionReplicaImpl struct {
@ -301,3 +303,13 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
return ok
}
//-----------------------------------------------------------------------------------------------------
func (colReplica *collectionReplicaImpl) freeAll() {
for _, seg := range colReplica.segments {
deleteSegment(seg)
}
for _, col := range colReplica.collections {
deleteCollection(col)
}
}

View File

@ -30,8 +30,10 @@ func (dsService *dataSyncService) start() {
}
func (dsService *dataSyncService) close() {
if dsService.fg != nil {
dsService.fg.Close()
}
}
func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support

View File

@ -69,5 +69,18 @@ func (node *QueryNode) Start() {
}
func (node *QueryNode) Close() {
// TODO: close services
<-node.ctx.Done()
// free collectionReplica
(*node.replica).freeAll()
// close services
if node.dataSyncService != nil {
(*node.dataSyncService).close()
}
if node.searchService != nil {
(*node.searchService).close()
}
if node.statsService != nil {
(*node.statsService).close()
}
}

View File

@ -23,7 +23,6 @@ import (
func TestSearch_Search(t *testing.T) {
Params.Init()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// init query node
pulsarURL, _ := Params.pulsarAddress()
@ -240,6 +239,6 @@ func TestSearch_Search(t *testing.T) {
time.Sleep(2 * time.Second)
node.searchService.close()
cancel()
node.Close()
}

View File

@ -59,6 +59,10 @@ func (sService *statsService) start() {
}
}
func (sService *statsService) close() {
(*sService.statsStream).Close()
}
func (sService *statsService) sendSegmentStatistic() {
statisticData := (*sService.replica).getSegmentStatistics()

View File

@ -77,8 +77,6 @@ func (fg *TimeTickedFlowGraph) Close() {
}
(*inStream.inStream).Close()
}
// close input channels
v.Close()
}
}