From b445587e11b5802daf3b4075753ea5d3ed4e1188 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 26 Jan 2021 14:46:54 +0800 Subject: [PATCH] Add datanode main Signed-off-by: XuanYang-cn --- Makefile | 6 + cmd/datanode/main.go | 136 +++++++++++++++ configs/milvus.yaml | 4 +- internal/datanode/data_node.go | 78 +++++++-- internal/datanode/data_node_test.go | 1 + internal/datanode/factory.go | 11 ++ internal/datanode/flow_graph_dd_node_test.go | 2 +- .../flow_graph_insert_buffer_node_test.go | 2 +- internal/datanode/flow_graph_message.go | 2 +- .../flow_graph_msg_stream_input_node.go | 7 +- internal/datanode/meta_table_test.go | 4 +- internal/datanode/param_table.go | 164 +----------------- internal/datanode/param_table_test.go | 15 -- internal/distributed/datanode/client.go | 6 + internal/distributed/datanode/service.go | 23 ++- 15 files changed, 247 insertions(+), 214 deletions(-) create mode 100644 cmd/datanode/main.go diff --git a/Makefile b/Makefile index 7472a89a0c..26fc8de496 100644 --- a/Makefile +++ b/Makefile @@ -101,6 +101,12 @@ writenode: build-cpp @echo "Building write node ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/writenode $(PWD)/cmd/writenode/writenode.go 1>/dev/null +# Builds various components locally. +datanode: build-cpp + @echo "Building each component's binary to './bin'" + @echo "Building data node ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/datanode $(PWD)/cmd/datanode/main.go 1>/dev/null + # Builds various components locally. indexnode: build-cpp @echo "Building each component's binary to './bin'" diff --git a/cmd/datanode/main.go b/cmd/datanode/main.go new file mode 100644 index 0000000000..9801966ad0 --- /dev/null +++ b/cmd/datanode/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + dn "github.com/zilliztech/milvus-distributed/internal/datanode" + dnc "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +const retry = 10 + +func main() { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svr, err := dnc.New(ctx) + if err != nil { + panic(err) + } + + log.Println("Datanode is", dn.Params.NodeID) + + // --- Master Service Client --- + log.Println("Master service address:", dn.Params.MasterAddress) + masterClient, err := msc.NewGrpcClient(dn.Params.MasterAddress, 20*time.Second) + if err != nil { + panic(err) + } + + if err = masterClient.Init(); err != nil { + panic(err) + } + + if err = masterClient.Start(); err != nil { + panic(err) + } + + var cnt int + for cnt = 0; cnt < retry; cnt++ { + msStates, err := masterClient.GetComponentStates() + if err != nil { + continue + } + if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if msStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Connect to master service failed") + } + + if err := svr.SetMasterServiceInterface(masterClient); err != nil { + panic(err) + } + + // --- Data Service Client --- + log.Println("Data service address: ", dn.Params.ServiceAddress) + dataService := dsc.NewClient(dn.Params.ServiceAddress) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + + for cnt = 0; cnt < retry; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Connect to data service failed") + } + + if err := svr.SetDataServiceInterface(dataService); err != nil { + panic(err) + } + + if err := svr.Init(); err != nil { + panic(err) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Start(); err != nil { + panic(err) + } + + <-ctx.Done() + log.Println("Got signal to exit signal:", sig.String()) + + svr.Stop() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + os.Exit(code) +} diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 299fb21f46..e01759a0a6 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -22,8 +22,8 @@ etcd: rootPath: by-dev metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath - segFlushMetaSubPath: writer/segment - ddlFlushMetaSubPath: writer/ddl + segFlushMetaSubPath: datanode/segment # Full Path = rootPath/metaSubPath/segFlushMetaSubPath + ddlFlushMetaSubPath: datanode/ddl # Full Path = rootPath/metaSubPaht/ddlFlushMetaSubPath writeNodeSegKvSubPath: writer/segment # GOOSE TODO: remove this writeNodeDDLKvSubPath: writer/ddl # GOOSE TODO: remove this segThreshold: 10000 diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ced35ae9dc..500e0456f5 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -16,7 +16,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( @@ -25,18 +24,31 @@ const ( type ( Inteface interface { - typeutil.Service - typeutil.Component + // Service + Init() error + Start() error + Stop() error + + // Component + GetComponentStates() (*internalpb2.ComponentStates, error) + GetTimeTickChannel() (string, error) // This function has no effect + GetStatisticsChannel() (string, error) // This function has no effect WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) + + SetMasterServiceInterface(ms MasterServiceInterface) error + + SetDataServiceInterface(ds DataServiceInterface) error } DataServiceInterface interface { + GetComponentStates() (*internalpb2.ComponentStates, error) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) } MasterServiceInterface interface { + GetComponentStates() (*internalpb2.ComponentStates, error) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) @@ -55,32 +67,42 @@ type ( masterService MasterServiceInterface dataService DataServiceInterface - replica collectionReplica + flushChan chan *flushMsg + replica collectionReplica tracer opentracing.Tracer closer io.Closer } ) -func NewDataNode(ctx context.Context, nodeID UniqueID, masterService MasterServiceInterface, - dataService DataServiceInterface) *DataNode { +func NewDataNode(ctx context.Context) *DataNode { Params.Init() node := &DataNode{ ctx: ctx, - NodeID: nodeID, // GOOSE TODO - Role: "DataNode", // GOOSE TODO + NodeID: Params.NodeID, // GOOSE TODO + Role: "DataNode", // GOOSE TODO State: internalpb2.StateCode_INITIALIZING, dataSyncService: nil, metaService: nil, - masterService: masterService, - dataService: dataService, + masterService: nil, + dataService: nil, replica: nil, } return node } +func (node *DataNode) SetMasterServiceInterface(ms MasterServiceInterface) error { + node.masterService = ms + return nil +} + +func (node *DataNode) SetDataServiceInterface(ds DataServiceInterface) error { + node.dataService = ds + return nil +} + func (node *DataNode) Init() error { req := &datapb.RegisterNodeRequest{ @@ -123,8 +145,8 @@ func (node *DataNode) Init() error { var alloc allocator = newAllocatorImpl(node.masterService) chanSize := 100 - flushChan := make(chan *flushMsg, chanSize) - node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc) + node.flushChan = make(chan *flushMsg, chanSize) + node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc) node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica @@ -148,7 +170,6 @@ func (node *DataNode) Init() error { opentracing.SetGlobalTracer(node.tracer) - node.State = internalpb2.StateCode_HEALTHY return nil } @@ -156,13 +177,15 @@ func (node *DataNode) Start() error { go node.dataSyncService.start() node.metaService.init() + node.State = internalpb2.StateCode_HEALTHY return nil } -func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) error { - // GOOSE TODO: Implement me - return nil +func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { + Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...) + + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil } func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { @@ -173,13 +196,23 @@ func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) StateCode: node.State, }, SubcomponentStates: make([]*internalpb2.ComponentInfo, 0), - Status: &commonpb.Status{}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, } return states, nil } func (node *DataNode) FlushSegments(in *datapb.FlushSegRequest) error { - // GOOSE TODO: Implement me + ids := make([]UniqueID, 0) + ids = append(ids, in.SegmentIDs...) + + flushmsg := &flushMsg{ + msgID: in.Base.MsgID, + timestamp: in.Base.Timestamp, + segmentIDs: ids, + collectionID: in.CollectionID, + } + + node.flushChan <- flushmsg return nil } @@ -195,5 +228,12 @@ func (node *DataNode) Stop() error { node.closer.Close() } return nil - +} + +func (node *DataNode) GetTimeTickChannel() (string, error) { + return "Nothing happened", nil +} + +func (node *DataNode) GetStatisticsChannel() (string, error) { + return "Nothing happened", nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index da9f49684f..1cbdf38d3c 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -29,6 +29,7 @@ func refreshChannelNames() { Params.DDChannelNames = []string{"datanode-test"} Params.SegmentStatisticsChannelName = "segtment-statistics" Params.CompleteFlushChannelName = "flush-completed" + Params.InsertChannelNames = []string{"intsert-a-1", "insert-b-1"} Params.TimeTickChannelName = "hard-timetick" suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10) Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go index bb87ca008b..9152f37acf 100644 --- a/internal/datanode/factory.go +++ b/internal/datanode/factory.go @@ -3,6 +3,7 @@ package datanode import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -203,3 +204,13 @@ func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectio } return resp, nil } + +func (m *MasterServiceFactory) GetComponentStates() (*internalpb2.ComponentStates, error) { + return &internalpb2.ComponentStates{ + State: &internalpb2.ComponentInfo{}, + SubcomponentStates: make([]*internalpb2.ComponentInfo, 0), + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil +} diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 39db14419c..35386b3dad 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -130,7 +130,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { inFlushCh <- &flushMsg{ msgID: 1, - Timestamp: 6, + timestamp: 6, segmentIDs: []UniqueID{1}, collectionID: UniqueID(1), } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 82f0ef69e9..28295a2df3 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -195,7 +195,7 @@ func genInsertMsg() insertMsg { fmsg := &flushMsg{ msgID: 1, - Timestamp: 2000, + timestamp: 2000, segmentIDs: []UniqueID{1}, collectionID: UniqueID(1), } diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index e53fda460d..983d621de1 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -56,7 +56,7 @@ type ( flushMsg struct { msgID UniqueID - Timestamp Timestamp + timestamp Timestamp segmentIDs []UniqueID collectionID UniqueID } diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go index 56e711f0e9..163ac7fc6c 100644 --- a/internal/datanode/flow_graph_msg_stream_input_node.go +++ b/internal/datanode/flow_graph_msg_stream_input_node.go @@ -10,20 +10,17 @@ import ( ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - receiveBufSize := Params.InsertReceiveBufSize - pulsarBufSize := Params.InsertPulsarBufSize - msgStreamURL := Params.PulsarAddress consumeChannels := Params.InsertChannelNames consumeSubName := Params.MsgChannelSubName - insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize) + insertStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024) insertStream.SetPulsarClient(msgStreamURL) unmarshalDispatcher := util.NewUnmarshalDispatcher() - insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024) var stream msgstream.MsgStream = insertStream diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go index 1844edfe26..dd5a4251dc 100644 --- a/internal/datanode/meta_table_test.go +++ b/internal/datanode/meta_table_test.go @@ -15,9 +15,9 @@ func TestMetaTable_all(t *testing.T) { etcdAddr := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) require.NoError(t, err) - etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root/writer") + etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root") - _, err = cli.Delete(context.TODO(), "/etcd/test/root/writer", clientv3.WithPrefix()) + _, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix()) require.NoError(t, err) meta, err := NewMetaTable(etcdKV) diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index a8e8526ccc..d092260c3e 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -35,32 +35,23 @@ type ParamTable struct { PulsarAddress string // - insert channel - - InsertChannelNames []string - InsertChannelRange []int - InsertReceiveBufSize int64 - InsertPulsarBufSize int64 + InsertChannelNames []string // - dd channel - - DDChannelNames []string // GOOSE TODO, set after Init - // DDReceiveBufSize int64 - // DDPulsarBufSize int64 + DDChannelNames []string // - seg statistics channel - - SegmentStatisticsChannelName string // GOOSE TODO, set after init - // SegmentStatisticsBufSize int64 - // SegmentStatisticsUpdateInterval int // GOOSE TODO remove + SegmentStatisticsChannelName string // - timetick channel - - TimeTickChannelName string // GOOSE TODO: set after init + TimeTickChannelName string // - complete flush channel - - CompleteFlushChannelName string // GOOSE TODO: set after init + CompleteFlushChannelName string // - channel subname - MsgChannelSubName string - DefaultPartitionName string - // --- ETCD --- EtcdAddress string MetaRootPath string @@ -104,25 +95,9 @@ func (p *ParamTable) Init() { // - insert channel - p.initInsertChannelNames() - p.initInsertChannelRange() - p.initInsertReceiveBufSize() - p.initInsertPulsarBufSize() // - dd channel - - // p.initDDChannelNames() - // p.initDDReceiveBufSize() - // p.initDDPulsarBufSize() - - // - seg statistics channel - - // p.initSegmentStatisticsChannelName() - // p.initSegmentStatisticsBufSize() - // p.initSegmentStatisticsUpdateInterval() - - // - timetick channel - - // p.initTimeTickChannelName() - - // - flush completed channel - - // p.initCompleteFlushChannelName() + p.initDDChannelNames() // - channel subname - p.initMsgChannelSubName() @@ -139,10 +114,6 @@ func (p *ParamTable) Init() { p.initMinioSecretAccessKey() p.initMinioUseSSL() p.initMinioBucketName() - - p.initDefaultPartitionName() - // p.initSliceIndex() - } // ==== DataNode internal components configs ==== @@ -234,121 +205,11 @@ func (p *ParamTable) initPulsarAddress() { // - insert channel - func (p *ParamTable) initInsertChannelNames() { - - prefix, err := p.Load("msgChannel.chanNamePrefix.insert") - if err != nil { - log.Fatal(err) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - sep := len(channelIDs) / len(p.dataNodeIDList) - index := p.sliceIndex() - if index == -1 { - panic("dataNodeID not Match with Config") - } - start := index * sep - p.InsertChannelNames = ret[start : start+sep] + p.InsertChannelNames = make([]string, 0) } -func (p *ParamTable) initInsertChannelRange() { - insertChannelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",") -} - -func (p *ParamTable) initInsertReceiveBufSize() { - p.InsertReceiveBufSize = p.ParseInt64("dataNode.msgStream.insert.recvBufSize") -} - -func (p *ParamTable) initInsertPulsarBufSize() { - p.InsertPulsarBufSize = p.ParseInt64("dataNode.msgStream.insert.pulsarBufSize") -} - -// - dd channel - GOOSE TODO: remove func (p *ParamTable) initDDChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") - if err != nil { - panic(err) - } - prefix += "-" - iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.DDChannelNames = ret -} - -// func (p *ParamTable) initDDReceiveBufSize() { -// revBufSize, err := p.Load("dataNode.msgStream.dataDefinition.recvBufSize") -// if err != nil { -// panic(err) -// } -// bufSize, err := strconv.Atoi(revBufSize) -// if err != nil { -// panic(err) -// } -// p.DDReceiveBufSize = int64(bufSize) -// } - -// func (p *ParamTable) initDDPulsarBufSize() { -// pulsarBufSize, err := p.Load("dataNode.msgStream.dataDefinition.pulsarBufSize") -// if err != nil { -// panic(err) -// } -// bufSize, err := strconv.Atoi(pulsarBufSize) -// if err != nil { -// panic(err) -// } -// p.DDPulsarBufSize = int64(bufSize) -// } - -// - seg statistics channel - GOOSE TODO: remove -func (p *ParamTable) initSegmentStatisticsChannelName() { - - channelName, err := p.Load("msgChannel.chanNamePrefix.dataNodeSegStatistics") - if err != nil { - panic(err) - } - - p.SegmentStatisticsChannelName = channelName -} - -// func (p *ParamTable) initSegmentStatisticsBufSize() { -// p.SegmentStatisticsBufSize = p.ParseInt64("dataNode.msgStream.segStatistics.recvBufSize") -// } -// -// func (p *ParamTable) initSegmentStatisticsUpdateInterval() { -// p.SegmentStatisticsUpdateInterval = p.ParseInt("dataNode.msgStream.segStatistics.updateInterval") -// } - -// - flush completed channel - GOOSE TODO: remove -func (p *ParamTable) initCompleteFlushChannelName() { - p.CompleteFlushChannelName = "flush-completed" -} - -// - Timetick channel - GOOSE TODO: remove -func (p *ParamTable) initTimeTickChannelName() { - channels, err := p.Load("msgChannel.chanNamePrefix.dataNodeTimeTick") - if err != nil { - panic(err) - } - p.TimeTickChannelName = channels + "-" + strconv.FormatInt(p.NodeID, 10) + p.DDChannelNames = make([]string, 0) } // - msg channel subname - @@ -360,15 +221,6 @@ func (p *ParamTable) initMsgChannelSubName() { p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10) } -func (p *ParamTable) initDefaultPartitionName() { - defaultTag, err := p.Load("common.defaultPartitionTag") - if err != nil { - panic(err) - } - - p.DefaultPartitionName = defaultTag -} - // --- ETCD --- func (p *ParamTable) initEtcdAddress() { addr, err := p.Load("_EtcdAddress") diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index 6fa47e3370..5f34635e09 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -59,21 +59,6 @@ func TestParamTable_DataNode(t *testing.T) { log.Println("InsertChannelNames:", names) }) - t.Run("Test insertChannelRange", func(t *testing.T) { - channelRange := Params.InsertChannelRange - log.Println("InsertChannelRange:", channelRange) - }) - - t.Run("Test insertMsgStreamReceiveBufSize", func(t *testing.T) { - bufSize := Params.InsertReceiveBufSize - log.Println("InsertReceiveBufSize:", bufSize) - }) - - t.Run("Test insertPulsarBufSize", func(t *testing.T) { - bufSize := Params.InsertPulsarBufSize - log.Println("InsertPulsarBufSize:", bufSize) - }) - t.Run("Test ddChannelNames", func(t *testing.T) { names := Params.DDChannelNames log.Println("DDChannelNames:", names) diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client.go index 4957d6dbfb..ebbd730f0b 100644 --- a/internal/distributed/datanode/client.go +++ b/internal/distributed/datanode/client.go @@ -2,6 +2,7 @@ package datanode import ( "context" + "time" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -10,6 +11,11 @@ import ( "google.golang.org/grpc" ) +const ( + RPCConnectionTimeout = 30 * time.Second + Retry = 3 +) + type Client struct { ctx context.Context grpc datapb.DataNodeClient diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index e39a4b8fc8..73dcd0755e 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -5,7 +5,6 @@ import ( "net" "strconv" "sync" - "time" dn "github.com/zilliztech/milvus-distributed/internal/datanode" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -16,11 +15,6 @@ import ( "google.golang.org/grpc" ) -const ( - RPCConnectionTimeout = 30 * time.Second - Retry = 3 -) - type Server struct { core *dn.DataNode @@ -32,11 +26,10 @@ type Server struct { cancel context.CancelFunc } -func New(masterService dn.MasterServiceInterface, dataService dn.DataServiceInterface) (*Server, error) { +func New(ctx context.Context) (*Server, error) { var s = &Server{} - s.ctx, s.cancel = context.WithCancel(context.Background()) - s.core = dn.NewDataNode(s.ctx, 0, masterService, dataService) + s.core = dn.NewDataNode(s.ctx) s.grpcServer = grpc.NewServer() datapb.RegisterDataNodeServer(s.grpcServer, s) addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10) @@ -63,6 +56,14 @@ func New(masterService dn.MasterServiceInterface, dataService dn.DataServiceInte return s, nil } +func (s *Server) SetMasterServiceInterface(ms dn.MasterServiceInterface) error { + return s.core.SetMasterServiceInterface(ms) +} + +func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { + return s.core.SetDataServiceInterface(ds) +} + func (s *Server) Init() error { err := s.core.Init() if err != nil { @@ -85,9 +86,7 @@ func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) } func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, s.core.WatchDmChannels(in) + return s.core.WatchDmChannels(in) } func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {