diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 58f6fcc803..157cf79d1a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -214,7 +214,6 @@ msgChannel: rootCoordDelta: "rootcoord-delta" search: "search" searchResult: "searchResult" - proxyTimeTick: "proxyTimeTick" queryTimeTick: "queryTimeTick" queryNodeStats: "query-node-stats" # Cmd for loadIndex, flush, etc... diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 527539fe8c..72baf2c9b2 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -279,7 +279,7 @@ func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) { } func subscriptionGenerator(collectionID int64, nodeID int64) string { - return fmt.Sprintf("%s-%s-%d-%d", Params.DataNodeCfg.ClusterChannelPrefix, Params.DataNodeCfg.SubscriptionNamePrefix, nodeID, collectionID) + return fmt.Sprintf("%s-%d-%d", Params.DataNodeCfg.DataNodeSubName, nodeID, collectionID) } func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index a5a1dd9d59..283d4b5a83 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -447,11 +447,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { log.Error("DataCoord failed to create timetick channel", zap.Error(err)) return } - ttMsgStream.AsConsumerWithPosition([]string{Params.DataCoordCfg.TimeTickChannelName}, - Params.DataCoordCfg.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest) + ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick}, + Params.MsgChannelCfg.DataCoordSubName, mqclient.SubscriptionPositionLatest) log.Debug("DataCoord creates the timetick channel consumer", - zap.String("timeTickChannel", Params.DataCoordCfg.TimeTickChannelName), - zap.String("subscription", Params.DataCoordCfg.DataCoordSubscriptionName)) + zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick), + zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName)) ttMsgStream.Start() go func() { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 8f45bdcdef..f8768ca1f1 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -58,7 +58,7 @@ func TestGetSegmentInfoChannel(t *testing.T) { resp, err := svr.GetSegmentInfoChannel(context.TODO()) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, Params.DataCoordCfg.SegmentInfoChannelName, resp.Value) + assert.EqualValues(t, Params.MsgChannelCfg.DataCoordSegmentInfo, resp.Value) }) } @@ -245,7 +245,7 @@ func TestGetTimeTickChannel(t *testing.T) { resp, err := svr.GetTimeTickChannel(context.TODO()) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, Params.DataCoordCfg.TimeTickChannelName, resp.Value) + assert.EqualValues(t, Params.MsgChannelCfg.DataCoordTimeTick, resp.Value) } func TestGetSegmentStates(t *testing.T) { @@ -1062,7 +1062,7 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName}) + ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ @@ -1130,7 +1130,7 @@ func TestDataNodeTtChannel(t *testing.T) { }) ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName}) + ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ @@ -1212,7 +1212,7 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName}) + ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() node := &NodeInfo{ @@ -2232,7 +2232,7 @@ func TestGetFlushState(t *testing.T) { func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { Params.Init() - Params.DataCoordCfg.TimeTickChannelName = Params.DataCoordCfg.TimeTickChannelName + strconv.Itoa(rand.Int()) + Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) var err error factory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index d4777b070e..f4190cedff 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -48,7 +48,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - Value: Params.DataCoordCfg.TimeTickChannelName, + Value: Params.MsgChannelCfg.DataCoordTimeTick, }, nil } @@ -269,7 +269,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - Value: Params.DataCoordCfg.SegmentInfoChannelName, + Value: Params.MsgChannelCfg.DataCoordSegmentInfo, }, nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 3e9e343a87..1b9e6fa290 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -207,7 +207,7 @@ func (node *DataNode) initSession() error { // Init function does nothing now. func (node *DataNode) Init() error { log.Debug("DataNode Init", - zap.String("TimeTickChannelName", Params.DataNodeCfg.TimeTickChannelName), + zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick), ) if err := node.initSession(); err != nil { log.Error("DataNode init session failed", zap.Error(err)) @@ -227,7 +227,7 @@ func (node *DataNode) Init() error { return err } log.Debug("DataNode Init", - zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName)) + zap.String("MsgChannelSubName", Params.DataNodeCfg.DataNodeSubName)) return nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 7bd7bada42..ec3d376da1 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -54,7 +54,7 @@ func TestMain(t *testing.M) { Params.DataNodeCfg.InitAlias("datanode-alias-1") Params.Init() // change to specific channel for test - Params.DataNodeCfg.TimeTickChannelName = Params.DataNodeCfg.TimeTickChannelName + strconv.Itoa(rand.Int()) + Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) code := t.Run() os.Exit(code) } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index a0a8c5e88e..fd6d182ee5 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -284,7 +284,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI return nil } pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName) - deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.DataNodeCfg.DmlChannelName, Params.DataNodeCfg.DeltaChannelName) + deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) if err != nil { log.Error(err.Error()) return nil diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index b1536d83ff..ffa585668c 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -34,7 +34,7 @@ import ( func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) { // subName should be unique, since pchannelName is shared among several collections // consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) - consumeSubName := fmt.Sprintf("%s-%d", Params.DataNodeCfg.MsgChannelSubName, dmNodeConfig.collectionID) + consumeSubName := fmt.Sprintf("%s-%d", Params.DataNodeCfg.DataNodeSubName, dmNodeConfig.collectionID) insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx) if err != nil { return nil, err diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 63ce319026..dc18c550a4 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -715,8 +715,8 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM if err != nil { return nil, err } - wTt.AsProducer([]string{Params.DataNodeCfg.TimeTickChannelName}) - log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.DataNodeCfg.TimeTickChannelName)) + wTt.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) + log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick)) var wTtMsgStream msgstream.MsgStream = wTt wTtMsgStream.Start() diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 4b4c66b3b3..f23ec14ff5 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -82,9 +82,9 @@ func TestGrpcService(t *testing.T) { rootcoord.Params.Init() rootcoord.Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) rootcoord.Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) - rootcoord.Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("msgChannel%d", randVal) - rootcoord.Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("timeTick%d", randVal) - rootcoord.Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("stateChannel%d", randVal) + rootcoord.Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal) + rootcoord.Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal) + rootcoord.Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal) rootcoord.Params.RootCoordCfg.MaxPartitionNum = 64 rootcoord.Params.CommonCfg.DefaultPartitionName = "_default" diff --git a/internal/querycoord/channel_unsubscribe.go b/internal/querycoord/channel_unsubscribe.go index 8ada8686f4..cfadb9f52e 100644 --- a/internal/querycoord/channel_unsubscribe.go +++ b/internal/querycoord/channel_unsubscribe.go @@ -132,7 +132,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { nodeID := channelInfo.NodeID for _, collectionChannels := range channelInfo.CollectionChannels { collectionID := collectionChannels.CollectionID - subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, nodeID) + subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, nodeID) err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels) if err != nil { log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID)) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index 493830872e..5c6d01be9e 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -71,7 +71,7 @@ func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.QueryCoordCfg.TimeTickChannelName, + Value: Params.MsgChannelCfg.QueryCoordTimeTick, }, nil } @@ -83,7 +83,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.QueryCoordCfg.StatsChannelName, + Value: Params.MsgChannelCfg.QueryNodeStats, }, nil } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index fd106d6aad..e177ae9fd9 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -837,8 +837,8 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh // all collection use the same query channel colIDForAssignChannel := UniqueID(0) - searchPrefix := Params.QueryCoordCfg.SearchChannelPrefix - searchResultPrefix := Params.QueryCoordCfg.SearchResultChannelPrefix + searchPrefix := Params.MsgChannelCfg.QueryCoordSearch + searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel)) diff --git a/internal/querycoord/metrics_info.go b/internal/querycoord/metrics_info.go index 858ff2dcfe..76c41f0d94 100644 --- a/internal/querycoord/metrics_info.go +++ b/internal/querycoord/metrics_info.go @@ -58,8 +58,8 @@ func getSystemInfoMetrics( ID: qc.session.ServerID, }, SystemConfigurations: metricsinfo.QueryCoordConfiguration{ - SearchChannelPrefix: Params.QueryCoordCfg.SearchChannelPrefix, - SearchResultChannelPrefix: Params.QueryCoordCfg.SearchResultChannelPrefix, + SearchChannelPrefix: Params.MsgChannelCfg.QueryCoordSearch, + SearchResultChannelPrefix: Params.MsgChannelCfg.QueryCoordSearchResult, }, }, ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0), diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index ed633f1cf4..2335843545 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -273,8 +273,8 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, rand.Seed(time.Now().UnixNano()) queryChannels := make([]*queryChannelInfo, 0) channelID := len(queryChannels) - searchPrefix := Params.QueryCoordCfg.SearchChannelPrefix - searchResultPrefix := Params.QueryCoordCfg.SearchResultChannelPrefix + searchPrefix := Params.MsgChannelCfg.QueryCoordSearch + searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10) diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 26f41a74cf..0ad7d775e7 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -46,11 +46,11 @@ func setup() { func refreshParams() { rand.Seed(time.Now().UnixNano()) suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10) - Params.QueryCoordCfg.StatsChannelName = Params.QueryCoordCfg.StatsChannelName + suffix - Params.QueryCoordCfg.TimeTickChannelName = Params.QueryCoordCfg.TimeTickChannelName + suffix + Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + suffix + Params.MsgChannelCfg.QueryCoordTimeTick = Params.MsgChannelCfg.QueryCoordTimeTick + suffix Params.BaseParams.MetaRootPath = Params.BaseParams.MetaRootPath + suffix - Params.QueryCoordCfg.DmlChannelPrefix = "Dml" - Params.QueryCoordCfg.DeltaChannelPrefix = "delta" + Params.MsgChannelCfg.RootCoordDml = "Dml" + Params.MsgChannelCfg.RootCoordDelta = "delta" GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo) } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index f86008f921..494f0a2fed 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2081,7 +2081,7 @@ func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int { } func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.QueryCoordCfg.DmlChannelPrefix, Params.QueryCoordCfg.DeltaChannelPrefix) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) if err != nil { return nil, err } diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index b8cb38a10e..f74eef0480 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -71,7 +71,7 @@ func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.String ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.QueryNodeCfg.QueryTimeTickChannelName, + Value: Params.MsgChannelCfg.QueryCoordTimeTick, }, nil } @@ -83,7 +83,7 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.QueryNodeCfg.StatsChannelName, + Value: Params.MsgChannelCfg.QueryNodeStats, }, nil } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index ea60b44fd1..abe75b44ca 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -234,7 +234,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) { func TestMain(m *testing.M) { setup() - Params.QueryNodeCfg.StatsChannelName = Params.QueryNodeCfg.StatsChannelName + strconv.Itoa(rand.Int()) + Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + strconv.Itoa(rand.Int()) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index 52d5ab9283..b70a15ab1f 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -55,7 +55,7 @@ func (sService *statsService) start() { sleepTimeInterval := Params.QueryNodeCfg.StatsPublishInterval // start pulsar - producerChannels := []string{Params.QueryNodeCfg.StatsChannelName} + producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats} statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx) statsStream.AsProducer(producerChannels) diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 1f37fde4bd..e183c6a2f8 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -50,7 +50,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { const receiveBufSize = 1024 // start pulsar - producerChannels := []string{Params.QueryNodeCfg.StatsChannelName} + producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats} msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/querynode/task.go b/internal/querynode/task.go index d18c4863b1..a5a0ce03a2 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -157,7 +157,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { return err } consumeChannels := []string{r.req.QueryChannel} - consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName) if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 { @@ -301,7 +301,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } }() - consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // group channels by to seeking or consuming channel2SeekPosition := make(map[string]*internalpb.MsgPosition) @@ -525,7 +525,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { } channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels) - consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // channels as consumer for _, channel := range vDeltaChannels { fg := channel2FlowGraph[channel] diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 05e1cb1e7f..0cba1c3ca3 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -435,17 +435,17 @@ func (c *Core) setMsgStreams() error { if Params.PulsarCfg.Address == "" { return fmt.Errorf("pulsar address is empty") } - if Params.RootCoordCfg.MsgChannelSubName == "" { - return fmt.Errorf("msgChannelSubName is empty") + if Params.MsgChannelCfg.RootCoordSubName == "" { + return fmt.Errorf("RootCoordSubName is empty") } // rootcoord time tick channel - if Params.RootCoordCfg.TimeTickChannel == "" { + if Params.MsgChannelCfg.RootCoordTimeTick == "" { return fmt.Errorf("timeTickChannel is empty") } timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) - timeTickStream.AsProducer([]string{Params.RootCoordCfg.TimeTickChannel}) - log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.RootCoordCfg.TimeTickChannel)) + timeTickStream.AsProducer([]string{Params.MsgChannelCfg.RootCoordTimeTick}) + log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.MsgChannelCfg.RootCoordTimeTick)) c.SendTimeTick = func(t typeutil.Timestamp, reason string) error { msgPack := ms.MsgPack{} @@ -1177,7 +1177,7 @@ func (c *Core) Start() error { } log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID)) - log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.RootCoordCfg.TimeTickChannel)) + log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.MsgChannelCfg.RootCoordTimeTick)) c.startOnce.Do(func() { if err := c.proxyManager.WatchProxy(); err != nil { @@ -1252,7 +1252,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.RootCoordCfg.TimeTickChannel, + Value: Params.MsgChannelCfg.RootCoordTimeTick, }, nil } @@ -1263,7 +1263,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.RootCoordCfg.StatisticsChannel, + Value: Params.MsgChannelCfg.RootCoordStatistics, }, nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 208ca2a4f1..54933af8f7 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -559,13 +559,13 @@ func TestRootCoord(t *testing.T) { core, err := NewCore(ctx, coreFactory) assert.Nil(t, err) randVal := rand.Int() - Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) - Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) - Params.RootCoordCfg.DmlChannelName = fmt.Sprintf("rootcoord-dml-test-%d", randVal) - Params.RootCoordCfg.DeltaChannelName = fmt.Sprintf("rootcoord-delta-test-%d", randVal) + Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.MsgChannelCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal) + Params.MsgChannelCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal) etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) assert.NoError(t, err) @@ -626,7 +626,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := tmpFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName) + timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) timeTickStream.Start() dmlStream, _ := tmpFactory.NewMsgStream(ctx) @@ -723,7 +723,7 @@ func TestRootCoord(t *testing.T) { createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.RootCoordCfg.MsgChannelSubName) + dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName) dmlStream.Start() pChanMap := core.MetaTable.ListCollectionPhysicalChannels() @@ -2311,11 +2311,11 @@ func TestRootCoord2(t *testing.T) { randVal := rand.Int() - Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) - Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2361,7 +2361,7 @@ func TestRootCoord2(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName) + timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) @@ -2404,7 +2404,7 @@ func TestRootCoord2(t *testing.T) { collInfo, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) dmlStream, _ := msFactory.NewMsgStream(ctx) - dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.RootCoordCfg.MsgChannelSubName) + dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName) dmlStream.Start() msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) @@ -2589,11 +2589,11 @@ func TestCheckFlushedSegments(t *testing.T) { assert.Nil(t, err) randVal := rand.Int() - Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) - Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2642,7 +2642,7 @@ func TestCheckFlushedSegments(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName) + timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) @@ -2755,11 +2755,11 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { core, err := NewCore(ctx, msFactory) assert.Nil(t, err) randVal := rand.Int() - Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) - Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2809,7 +2809,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName) + timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 05d6028797..fbee5d2dbb 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -147,7 +147,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { chanNames[i] = ToPhysicalChannel(vchanNames[i]) deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName() - deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName) + deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) if err1 != nil || deltaChanName != deltaChanNames[i] { return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i]) } @@ -361,7 +361,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // remove delta channels deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames)) for i, chanName := range collMeta.PhysicalChannelNames { - if deltaChanNames[i], err = ConvertChannelName(chanName, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName); err != nil { + if deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta); err != nil { return err } } diff --git a/internal/rootcoord/timestamp_test.go b/internal/rootcoord/timestamp_test.go index 326ed9317a..6124f45224 100644 --- a/internal/rootcoord/timestamp_test.go +++ b/internal/rootcoord/timestamp_test.go @@ -88,11 +88,11 @@ func BenchmarkAllocTimestamp(b *testing.B) { randVal := rand.Int() - Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) - Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) + Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal) + Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal) Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) - Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) err = core.SetDataCoord(ctx, &tbd{}) assert.Nil(b, err) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index f5a489ca73..ba19e89848 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -87,9 +87,9 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { // initialize dml channels used for insert - dmlChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DmlChannelNum) + dmlChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum) // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels - deltaChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DeltaChannelName, Params.RootCoordCfg.DmlChannelNum) + deltaChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum) // recover physical channels for all collections for collID, chanNames := range chanMap { @@ -99,7 +99,7 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact var err error deltaChanNames := make([]string, len(chanNames)) for i, chanName := range chanNames { - deltaChanNames[i], err = ConvertChannelName(chanName, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName) + deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) if err != nil { log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName)) panic("invalid dml channel name " + chanName) diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index 3a697d9711..3d56a74374 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -45,8 +45,8 @@ func TestTimetickSync(t *testing.T) { //} Params.RootCoordCfg.DmlChannelNum = 2 - Params.RootCoordCfg.DmlChannelName = "rootcoord-dml" - Params.RootCoordCfg.DeltaChannelName = "rootcoord-delta" + Params.MsgChannelCfg.RootCoordDml = "rootcoord-dml" + Params.MsgChannelCfg.RootCoordDelta = "rootcoord-delta" ttSync := newTimeTickSync(ctx, sourceID, factory, nil) var wg sync.WaitGroup diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 1d17751763..29d9ca7a80 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -55,9 +55,9 @@ type GlobalParamTable struct { RocksmqCfg rocksmqConfig MinioCfg minioConfig - CommonCfg commonConfig - KnowhereCfg knowhereConfig - //MsgChannelCfg msgChannelConfig + CommonCfg commonConfig + KnowhereCfg knowhereConfig + MsgChannelCfg msgChannelConfig RootCoordCfg rootCoordConfig ProxyCfg proxyConfig @@ -86,7 +86,7 @@ func (p *GlobalParamTable) Init() { p.CommonCfg.init(&p.BaseParams) p.KnowhereCfg.init(&p.BaseParams) - //p.MsgChannelCfg.init(&p.BaseParams) + p.MsgChannelCfg.init(&p.BaseParams) p.RootCoordCfg.init(&p.BaseParams) p.ProxyCfg.init(&p.BaseParams) @@ -286,155 +286,129 @@ func (p *knowhereConfig) initSimdType() { /////////////////////////////////////////////////////////////////////////////// // --- msgChannel --- -//type msgChannelConfig struct { -// BaseParams *BaseParamTable -// -// ClusterPrefix string -// RootCoordTimeTick string -// RootCoordStatistics string -// RootCoordDml string -// RootCoordDelta string -// QueryCoordSearch string -// QueryCoordSearchResult string -// ProxyTimeTick string -// QueryTimeTick string -// QueryNodeStats string -// Cmd string -// DataCoordInsertChannel string -// DataCoordStatistic string -// DataCoordTimeTick string -// DataCoordSegmentInfo string -// -// SkipQueryChannelRecovery string -// -// RootCoordSubNamePrefix string -// ProxySubNamePrefix string -// QueryNodeSubNamePrefix string -// DataNodeSubNamePrefix string -// DataCoordSubNamePrefix string -//} -// -//func (p *msgChannelConfig) init(bp *BaseParamTable) { -// p.BaseParams = bp -// -// // must init cluster prefix first -// p.initClusterPrefix() -// p.initRootCoordTimeTick() -// p.initRootCoordStatistics() -// p.initRootCoordDml() -// p.initRootCoordDelta() -// p.initQueryCoordSearch() -// p.initQueryCoordSearchResult() -// p.initProxyTimeTick() -// p.initQueryTimeTick() -// p.initQueryNodeStats() -// p.initMsgChannelCmd() -// p.initDataCoordInsertChannel() -// p.initDataCoordStatistic() -// p.initDataCoordTimeTick() -// p.initDataCoordSegmentInfo() -// -// p.initRootCoordSubNamePrefix() -// p.initProxySubNamePrefix() -// p.initQueryNodeSubNamePrefix() -// p.initDataNodeSubNamePrefix() -// p.initDataCoordSubNamePrefix() -//} -// -//func (p *msgChannelConfig) initClusterPrefix() { -// config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") -// if err != nil { -// panic(err) -// } -// p.ClusterPrefix = config -//} -// -//func (p *msgChannelConfig) initChanNamePrefix(cfg string) string { -// value, err := p.BaseParams.Load(cfg) -// if err != nil { -// panic(err) -// } -// s := []string{p.ClusterPrefix, value} -// return strings.Join(s, "-") -//} -// -//// --- msgChannel.chanNamePrefix --- -//func (p *msgChannelConfig) initRootCoordTimeTick() { -// p.RootCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordTimeTick") -//} -// -//func (p *msgChannelConfig) initRootCoordStatistics() { -// p.RootCoordStatistics = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordStatistics") -//} -// -//func (p *msgChannelConfig) initRootCoordDml() { -// p.RootCoordDml = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDml") -//} -// -//func (p *msgChannelConfig) initRootCoordDelta() { -// p.RootCoordDelta = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDelta") -//} -// -//func (p *msgChannelConfig) initQueryCoordSearch() { -// p.QueryCoordSearch = p.initChanNamePrefix("msgChannel.chanNamePrefix.search") -//} -// -//func (p *msgChannelConfig) initQueryCoordSearchResult() { -// p.QueryCoordSearchResult = p.initChanNamePrefix("msgChannel.chanNamePrefix.searchResult") -//} -// -//func (p *msgChannelConfig) initProxyTimeTick() { -// p.ProxyTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.proxyTimeTick") -//} -// -//func (p *msgChannelConfig) initQueryTimeTick() { -// p.QueryTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryTimeTick") -//} -// -//func (p *msgChannelConfig) initQueryNodeStats() { -// p.QueryNodeStats = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryNodeStats") -//} -// -//func (p *msgChannelConfig) initMsgChannelCmd() { -// p.Cmd = p.initChanNamePrefix("msgChannel.chanNamePrefix.cmd") -//} -// -//func (p *msgChannelConfig) initDataCoordInsertChannel() { -// p.DataCoordInsertChannel = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordInsertChannel") -//} -// -//func (p *msgChannelConfig) initDataCoordStatistic() { -// p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic") -//} -// -//func (p *msgChannelConfig) initDataCoordTimeTick() { -// p.DataCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordTimeTick") -//} -// -//func (p *msgChannelConfig) initDataCoordSegmentInfo() { -// p.DataCoordSegmentInfo = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordSegmentInfo") -//} -// -//// --- msgChannel.subNamePrefix --- -//func (p *msgChannelConfig) initRootCoordSubNamePrefix() { -// p.RootCoordSubNamePrefix = p.initChanNamePrefix("msgChannel.subNamePrefix.rootCoordSubNamePrefix") -//} -// -//func (p *msgChannelConfig) initProxySubNamePrefix() { -// p.ProxySubNamePrefix = p.initChanNamePrefix("msgChannel.subNamePrefix.proxySubNamePrefix") -//} -// -//func (p *msgChannelConfig) initQueryNodeSubNamePrefix() { -// p.QueryNodeSubNamePrefix = p.initChanNamePrefix("msgChannel.subNamePrefix.queryNodeSubNamePrefix") -//} -// -//func (p *msgChannelConfig) initDataNodeSubNamePrefix() { -// p.DataNodeSubNamePrefix = p.initChanNamePrefix("msgChannel.subNamePrefix.dataNodeSubNamePrefix") -//} -// -//func (p *msgChannelConfig) initDataCoordSubNamePrefix() { -// p.DataCoordSubNamePrefix = p.initChanNamePrefix("msgChannel.subNamePrefix.dataCoordSubNamePrefix") -//} +type msgChannelConfig struct { + BaseParams *BaseParamTable + + ClusterPrefix string + + RootCoordTimeTick string + RootCoordStatistics string + RootCoordDml string + RootCoordDelta string + RootCoordSubName string + + QueryCoordSearch string + QueryCoordSearchResult string + QueryCoordTimeTick string + QueryNodeStats string + + DataCoordInsert string + DataCoordStatistic string + DataCoordTimeTick string + DataCoordSegmentInfo string + DataCoordSubName string +} + +func (p *msgChannelConfig) init(bp *BaseParamTable) { + p.BaseParams = bp + + // must init cluster prefix first + p.initClusterPrefix() + + p.initRootCoordTimeTick() + p.initRootCoordStatistics() + p.initRootCoordDml() + p.initRootCoordDelta() + p.initRootCoordSubName() + + p.initQueryCoordSearch() + p.initQueryCoordSearchResult() + p.initQueryCoordTimeTick() + p.initQueryNodeStats() + + p.initDataCoordInsert() + p.initDataCoordStatistic() + p.initDataCoordTimeTick() + p.initDataCoordSegmentInfo() + p.initDataCoordSubName() +} + +func (p *msgChannelConfig) initClusterPrefix() { + str, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") + if err != nil { + panic(err) + } + p.ClusterPrefix = str +} + +func (p *msgChannelConfig) initChanNamePrefix(cfg string) string { + value, err := p.BaseParams.Load(cfg) + if err != nil { + panic(err) + } + s := []string{p.ClusterPrefix, value} + return strings.Join(s, "-") +} + +// --- rootcoord --- +func (p *msgChannelConfig) initRootCoordTimeTick() { + p.RootCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordTimeTick") +} + +func (p *msgChannelConfig) initRootCoordStatistics() { + p.RootCoordStatistics = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordStatistics") +} + +func (p *msgChannelConfig) initRootCoordDml() { + p.RootCoordDml = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDml") +} + +func (p *msgChannelConfig) initRootCoordDelta() { + p.RootCoordDelta = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDelta") +} + +func (p *msgChannelConfig) initRootCoordSubName() { + p.RootCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.rootCoordSubNamePrefix") +} + +// --- querycoord --- +func (p *msgChannelConfig) initQueryCoordSearch() { + p.QueryCoordSearch = p.initChanNamePrefix("msgChannel.chanNamePrefix.search") +} + +func (p *msgChannelConfig) initQueryCoordSearchResult() { + p.QueryCoordSearchResult = p.initChanNamePrefix("msgChannel.chanNamePrefix.searchResult") +} + +func (p *msgChannelConfig) initQueryCoordTimeTick() { + p.QueryCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryTimeTick") +} + +// --- querynode --- +func (p *msgChannelConfig) initQueryNodeStats() { + p.QueryNodeStats = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryNodeStats") +} + +// --- datacoord --- +func (p *msgChannelConfig) initDataCoordInsert() { + p.DataCoordInsert = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordInsertChannel") +} + +func (p *msgChannelConfig) initDataCoordStatistic() { + p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic") +} + +func (p *msgChannelConfig) initDataCoordTimeTick() { + p.DataCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordTimeTick") +} + +func (p *msgChannelConfig) initDataCoordSegmentInfo() { + p.DataCoordSegmentInfo = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordSegmentInfo") +} + +func (p *msgChannelConfig) initDataCoordSubName() { + p.DataCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataCoordSubNamePrefix") +} /////////////////////////////////////////////////////////////////////////////// // --- rootcoord --- @@ -444,13 +418,6 @@ type rootCoordConfig struct { Address string Port int - ClusterChannelPrefix string - MsgChannelSubName string - TimeTickChannel string - StatisticsChannel string - DmlChannelName string - DeltaChannelName string - DmlChannelNum int64 MaxPartitionNum int64 MinSegmentSizeToEnableIndex int64 @@ -462,72 +429,11 @@ type rootCoordConfig struct { func (p *rootCoordConfig) init(bp *BaseParamTable) { p.BaseParams = bp - // Has to init global msgchannel prefix before other channel names - p.initClusterMsgChannelPrefix() - p.initMsgChannelSubName() - p.initTimeTickChannel() - p.initStatisticsChannelName() - p.initDmlChannelName() - p.initDeltaChannelName() - p.initDmlChannelNum() p.initMaxPartitionNum() p.initMinSegmentSizeToEnableIndex() } -func (p *rootCoordConfig) initClusterMsgChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") - if err != nil { - panic(err) - } - p.ClusterChannelPrefix = config -} - -func (p *rootCoordConfig) initMsgChannelSubName() { - config, err := p.BaseParams.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.MsgChannelSubName = strings.Join(s, "-") -} - -func (p *rootCoordConfig) initTimeTickChannel() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordTimeTick") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.TimeTickChannel = strings.Join(s, "-") -} - -func (p *rootCoordConfig) initStatisticsChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordStatistics") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.StatisticsChannel = strings.Join(s, "-") -} - -func (p *rootCoordConfig) initDmlChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.DmlChannelName = strings.Join(s, "-") -} - -func (p *rootCoordConfig) initDeltaChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDelta") - if err != nil { - config = "rootcoord-delta" - } - s := []string{p.ClusterChannelPrefix, config} - p.DeltaChannelName = strings.Join(s, "-") -} - func (p *rootCoordConfig) initDmlChannelNum() { p.DmlChannelNum = p.BaseParams.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256) } @@ -563,9 +469,7 @@ type proxyConfig struct { BufFlagCleanupInterval time.Duration // --- Channels --- - ClusterChannelPrefix string - ProxyTimeTickChannelNames []string - ProxySubName string + ProxySubName string // required from QueryCoord SearchResultChannelNames []string @@ -582,10 +486,8 @@ func (p *proxyConfig) init(bp *BaseParamTable) { p.initTimeTickInterval() - // Has to init global msgchannel prefix before other channel names - p.initClusterMsgChannelPrefix() p.initProxySubName() - p.initProxyTimeTickChannelNames() + p.initMsgStreamTimeTickBufSize() p.initMaxNameLength() p.initMaxFieldNum() @@ -612,33 +514,19 @@ func (p *proxyConfig) initTimeTickInterval() { p.TimeTickInterval = time.Duration(interval) * time.Millisecond } -func (p *proxyConfig) initClusterMsgChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") - if err != nil { - panic(err) - } - p.ClusterChannelPrefix = config -} - func (p *proxyConfig) initProxySubName() { - config, err := p.BaseParams.Load("msgChannel.subNamePrefix.proxySubNamePrefix") + cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - s := []string{p.ClusterChannelPrefix, config, strconv.FormatInt(p.ProxyID, 10)} + subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.proxySubNamePrefix") + if err != nil { + panic(err) + } + s := []string{cluster, subname, strconv.FormatInt(p.ProxyID, 10)} p.ProxySubName = strings.Join(s, "-") } -func (p *proxyConfig) initProxyTimeTickChannelNames() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.proxyTimeTick") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config, "0"} - prefix := strings.Join(s, "-") - p.ProxyTimeTickChannelNames = []string{prefix} -} - func (p *proxyConfig) initMsgStreamTimeTickBufSize() { p.MsgStreamTimeTickBufSize = p.BaseParams.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512) } @@ -704,23 +592,9 @@ type queryCoordConfig struct { Port int QueryCoordID UniqueID - // stats - StatsChannelName string - - // timetick - TimeTickChannelName string - - // channels - ClusterChannelPrefix string - SearchChannelPrefix string - SearchResultChannelPrefix string - CreatedTime time.Time UpdatedTime time.Time - DmlChannelPrefix string - DeltaChannelPrefix string - //---- Handoff --- AutoHandoff bool @@ -734,19 +608,9 @@ type queryCoordConfig struct { func (p *queryCoordConfig) init(bp *BaseParamTable) { p.BaseParams = bp - // --- Channels --- - p.initClusterMsgChannelPrefix() - p.initSearchChannelPrefix() - p.initSearchResultChannelPrefix() - p.initStatsChannelName() - p.initTimeTickChannelName() - //---- Handoff --- p.initAutoHandoff() - p.initDmlChannelName() - p.initDeltaChannelName() - //---- Balance --- p.initAutoBalance() p.initOverloadedMemoryThresholdPercentage() @@ -754,51 +618,6 @@ func (p *queryCoordConfig) init(bp *BaseParamTable) { p.initMemoryUsageMaxDifferencePercentage() } -func (p *queryCoordConfig) initClusterMsgChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") - if err != nil { - panic(err) - } - p.ClusterChannelPrefix = config -} - -func (p *queryCoordConfig) initSearchChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.search") - if err != nil { - log.Error(err.Error()) - } - - s := []string{p.ClusterChannelPrefix, config} - p.SearchChannelPrefix = strings.Join(s, "-") -} - -func (p *queryCoordConfig) initSearchResultChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.searchResult") - if err != nil { - log.Error(err.Error()) - } - s := []string{p.ClusterChannelPrefix, config} - p.SearchResultChannelPrefix = strings.Join(s, "-") -} - -func (p *queryCoordConfig) initStatsChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.queryNodeStats") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.StatsChannelName = strings.Join(s, "-") -} - -func (p *queryCoordConfig) initTimeTickChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.queryTimeTick") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.TimeTickChannelName = strings.Join(s, "-") -} - func (p *queryCoordConfig) initAutoHandoff() { handoff, err := p.BaseParams.Load("queryCoord.autoHandoff") if err != nil { @@ -846,24 +665,6 @@ func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() { p.MemoryUsageMaxDifferencePercentage = float64(diffPercentage) / 100 } -func (p *queryCoordConfig) initDmlChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml") - if err != nil { - config = "rootcoord-dml" - } - s := []string{p.ClusterChannelPrefix, config} - p.DmlChannelPrefix = strings.Join(s, "-") -} - -func (p *queryCoordConfig) initDeltaChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDelta") - if err != nil { - config = "rootcoord-delta" - } - s := []string{p.ClusterChannelPrefix, config} - p.DeltaChannelPrefix = strings.Join(s, "-") -} - /////////////////////////////////////////////////////////////////////////////// // --- querynode --- type queryNodeConfig struct { @@ -877,10 +678,7 @@ type queryNodeConfig struct { CacheSize int64 // deprecated // channel prefix - ClusterChannelPrefix string - QueryTimeTickChannelName string - StatsChannelName string - MsgChannelSubName string + QueryNodeSubName string FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 @@ -928,11 +726,7 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) { p.initSearchPulsarBufSize() p.initSearchResultReceiveBufSize() - // Has to init global msgchannel prefix before other channel names - p.initClusterMsgChannelPrefix() - p.initQueryTimeTickChannelName() - p.initStatsChannelName() - p.initMsgChannelSubName() + p.initQueryNodeSubName() p.initStatsPublishInterval() @@ -948,7 +742,7 @@ func (p *queryNodeConfig) InitAlias(alias string) { // Refresh is called after session init func (p *queryNodeConfig) Refresh() { - p.initMsgChannelSubName() + p.initQueryNodeSubName() } func (p *queryNodeConfig) initCacheSize() { @@ -1001,40 +795,18 @@ func (p *queryNodeConfig) initSearchResultReceiveBufSize() { } // ------------------------ channel names -func (p *queryNodeConfig) initClusterMsgChannelPrefix() { - name, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") +func (p *queryNodeConfig) initQueryNodeSubName() { + cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - p.ClusterChannelPrefix = name -} - -func (p *queryNodeConfig) initQueryTimeTickChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.queryTimeTick") - if err != nil { - log.Warn(err.Error()) - } - s := []string{p.ClusterChannelPrefix, config} - p.QueryTimeTickChannelName = strings.Join(s, "-") -} - -func (p *queryNodeConfig) initMsgChannelSubName() { - namePrefix, err := p.BaseParams.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") + subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") if err != nil { log.Warn(err.Error()) } - s := []string{p.ClusterChannelPrefix, namePrefix} - p.MsgChannelSubName = strings.Join(s, "-") -} - -func (p *queryNodeConfig) initStatsChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.queryNodeStats") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.StatsChannelName = strings.Join(s, "-") + s := []string{cluster, subname} + p.QueryNodeSubName = strings.Join(s, "-") } func (p *queryNodeConfig) initGracefulTime() { @@ -1074,13 +846,6 @@ type dataCoordConfig struct { SegmentSealProportion float64 SegAssignmentExpiration int64 - // --- Channels --- - ClusterChannelPrefix string - InsertChannelPrefixName string - TimeTickChannelName string - SegmentInfoChannelName string - DataCoordSubscriptionName string - CreatedTime time.Time UpdatedTime time.Time @@ -1103,13 +868,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initSegmentSealProportion() p.initSegAssignmentExpiration() - // Has to init global msgchannel prefix before other channel names - p.initClusterMsgChannelPrefix() - p.initInsertChannelPrefixName() - p.initTimeTickChannelName() - p.initSegmentInfoChannelName() - p.initDataCoordSubscriptionName() - p.initEnableCompaction() p.initEnableAutoCompaction() @@ -1131,50 +889,6 @@ func (p *dataCoordConfig) initSegAssignmentExpiration() { p.SegAssignmentExpiration = p.BaseParams.ParseInt64WithDefault("dataCoord.segment.assignmentExpiration", 2000) } -func (p *dataCoordConfig) initClusterMsgChannelPrefix() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") - if err != nil { - panic(err) - } - p.ClusterChannelPrefix = config -} - -func (p *dataCoordConfig) initInsertChannelPrefixName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.InsertChannelPrefixName = strings.Join(s, "-") -} - -func (p *dataCoordConfig) initTimeTickChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.TimeTickChannelName = strings.Join(s, "-") -} - -func (p *dataCoordConfig) initSegmentInfoChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.SegmentInfoChannelName = strings.Join(s, "-") -} - -func (p *dataCoordConfig) initDataCoordSubscriptionName() { - config, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.DataCoordSubscriptionName = strings.Join(s, "-") -} - func (p *dataCoordConfig) initChannelWatchPrefix() { // WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path. // This will be removed after we reconstruct our config module. @@ -1186,7 +900,6 @@ func (p *dataCoordConfig) initEnableCompaction() { } // -- GC -- - func (p *dataCoordConfig) initEnableGarbageCollection() { p.EnableGarbageCollection = p.BaseParams.ParseBool("dataCoord.enableGarbageCollection", false) } @@ -1228,26 +941,14 @@ type dataNodeConfig struct { DeleteBinlogRootPath string Alias string // Different datanode in one machine - // Channel Name - DmlChannelName string - DeltaChannelName string - - // Cluster channels - ClusterChannelPrefix string - - // Timetick channel - TimeTickChannelName string - // Channel subscribition name - - MsgChannelSubName string + DataNodeSubName string // etcd ChannelWatchSubPath string CreatedTime time.Time UpdatedTime time.Time - - SubscriptionNamePrefix string } func (p *dataNodeConfig) init(bp *BaseParamTable) { @@ -1260,21 +961,13 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) { p.initStatsBinlogRootPath() p.initDeleteBinlogRootPath() - // Must init global msgchannel prefix before other channel names - p.initClusterMsgChannelPrefix() - p.initTimeTickChannelName() - p.initMsgChannelSubName() - + p.initDataNodeSubName() p.initChannelWatchPath() - - p.initDmlChannelName() - p.initDeltaChannelName() - p.initSubscriptionNamePrefix() } // Refresh is called after session init func (p *dataNodeConfig) Refresh() { - p.initMsgChannelSubName() + p.initDataNodeSubName() } // InitAlias init this DataNode alias @@ -1319,61 +1012,23 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() { p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log") } -func (p *dataNodeConfig) initClusterMsgChannelPrefix() { - name, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") +func (p *dataNodeConfig) initDataNodeSubName() { + cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - p.ClusterChannelPrefix = name -} - -func (p *dataNodeConfig) initTimeTickChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") + subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") if err != nil { panic(err) } - s := []string{p.ClusterChannelPrefix, config} - p.TimeTickChannelName = strings.Join(s, "-") -} - -func (p *dataNodeConfig) initMsgChannelSubName() { - config, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config, strconv.FormatInt(p.NodeID, 10)} - p.MsgChannelSubName = strings.Join(s, "-") + s := []string{cluster, subname, strconv.FormatInt(p.NodeID, 10)} + p.DataNodeSubName = strings.Join(s, "-") } func (p *dataNodeConfig) initChannelWatchPath() { p.ChannelWatchSubPath = "channelwatch" } -func (p *dataNodeConfig) initDmlChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml") - if err != nil { - panic(err) - } - s := []string{p.ClusterChannelPrefix, config} - p.DmlChannelName = strings.Join(s, "-") -} - -func (p *dataNodeConfig) initDeltaChannelName() { - config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDelta") - if err != nil { - config = "rootcoord-delta" - } - s := []string{p.ClusterChannelPrefix, config} - p.DeltaChannelName = strings.Join(s, "-") -} - -func (p *dataNodeConfig) initSubscriptionNamePrefix() { - prefix, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") - if err == nil { - p.SubscriptionNamePrefix = prefix - } -} - /////////////////////////////////////////////////////////////////////////////// // --- indexcoord --- type indexCoordConfig struct { diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 9645a21caa..186046fa3e 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -86,24 +86,56 @@ func TestGlobalParamTable(t *testing.T) { t.Logf("knowhere simd type = %s", Params.SimdType) }) + t.Run("test knowhereConfig", func(t *testing.T) { + Params := GlobalParams.MsgChannelCfg + + // -- rootcoord -- + assert.Equal(t, Params.RootCoordTimeTick, "by-dev-rootcoord-timetick") + t.Logf("rootcoord timetick channel = %s", Params.RootCoordTimeTick) + + assert.Equal(t, Params.RootCoordStatistics, "by-dev-rootcoord-statistics") + t.Logf("rootcoord statistics channel = %s", Params.RootCoordStatistics) + + assert.Equal(t, Params.RootCoordDml, "by-dev-rootcoord-dml") + t.Logf("rootcoord dml channel = %s", Params.RootCoordDml) + + assert.Equal(t, Params.RootCoordDelta, "by-dev-rootcoord-delta") + t.Logf("rootcoord delta channel = %s", Params.RootCoordDelta) + + assert.Equal(t, Params.RootCoordSubName, "by-dev-rootCoord") + t.Logf("rootcoord subname = %s", Params.RootCoordSubName) + + // -- querycoord -- + assert.Equal(t, Params.QueryCoordSearch, "by-dev-search") + t.Logf("querycoord search channel = %s", Params.QueryCoordSearch) + + assert.Equal(t, Params.QueryCoordSearchResult, "by-dev-searchResult") + t.Logf("querycoord search result channel = %s", Params.QueryCoordSearchResult) + + assert.Equal(t, Params.QueryCoordTimeTick, "by-dev-queryTimeTick") + t.Logf("querycoord timetick channel = %s", Params.QueryCoordTimeTick) + + // -- querynode -- + assert.Equal(t, Params.QueryNodeStats, "by-dev-query-node-stats") + t.Logf("querynode stats channel = %s", Params.QueryNodeStats) + + // -- datacoord -- + assert.Equal(t, Params.DataCoordInsert, "by-dev-insert-channel-") + t.Logf("datacoord insert channel = %s", Params.DataCoordInsert) + + assert.Equal(t, Params.DataCoordTimeTick, "by-dev-datacoord-timetick-channel") + t.Logf("datacoord timetick channel = %s", Params.DataCoordTimeTick) + + assert.Equal(t, Params.DataCoordSegmentInfo, "by-dev-segment-info-channel") + t.Logf("datacoord segment info channel = %s", Params.DataCoordSegmentInfo) + + assert.Equal(t, Params.DataCoordSubName, "by-dev-dataCoord") + t.Logf("datacoord subname = %s", Params.DataCoordSubName) + }) + t.Run("test rootCoordConfig", func(t *testing.T) { Params := GlobalParams.RootCoordCfg - assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord") - t.Logf("msg channel sub name = %s", Params.MsgChannelSubName) - - assert.Equal(t, Params.TimeTickChannel, "by-dev-rootcoord-timetick") - t.Logf("master time tick channel = %s", Params.TimeTickChannel) - - assert.Equal(t, Params.StatisticsChannel, "by-dev-rootcoord-statistics") - t.Logf("master statistics channel = %s", Params.StatisticsChannel) - - assert.Equal(t, Params.DmlChannelName, "by-dev-rootcoord-dml") - t.Logf("dml channel = %s", Params.DmlChannelName) - - assert.Equal(t, Params.DeltaChannelName, "by-dev-rootcoord-delta") - t.Logf("delta channel = %s", Params.DeltaChannelName) - assert.NotEqual(t, Params.MaxPartitionNum, 0) t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum) @@ -124,9 +156,6 @@ func TestGlobalParamTable(t *testing.T) { assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0") t.Logf("ProxySubName: %s", Params.ProxySubName) - assert.Equal(t, Params.ProxyTimeTickChannelNames, []string{"by-dev-proxyTimeTick-0"}) - t.Logf("ProxyTimeTickChannelNames: %v", Params.ProxyTimeTickChannelNames) - t.Logf("MsgStreamTimeTickBufSize: %d", Params.MsgStreamTimeTickBufSize) t.Logf("MaxNameLength: %d", Params.MaxNameLength) @@ -180,19 +209,7 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test queryCoordConfig", func(t *testing.T) { - Params := GlobalParams.QueryCoordCfg - - assert.Equal(t, Params.SearchChannelPrefix, "by-dev-search") - t.Logf("QueryCoord search channel = %s", Params.SearchChannelPrefix) - - assert.Equal(t, Params.SearchResultChannelPrefix, "by-dev-searchResult") - t.Logf("QueryCoord search result channel = %s", Params.SearchResultChannelPrefix) - - assert.Equal(t, Params.StatsChannelName, "by-dev-query-node-stats") - t.Logf("QueryCoord stats channel = %s", Params.StatsChannelName) - - assert.Equal(t, Params.TimeTickChannelName, "by-dev-queryTimeTick") - t.Logf("QueryCoord time tick channel = %s", Params.TimeTickChannelName) + //Params := GlobalParams.QueryCoordCfg }) t.Run("test queryNodeConfig", func(t *testing.T) { @@ -228,38 +245,20 @@ func TestGlobalParamTable(t *testing.T) { assert.Equal(t, int32(1024), maxParallelism) Params.QueryNodeID = 3 - Params.initMsgChannelSubName() - name := Params.MsgChannelSubName + Params.initQueryNodeSubName() + name := Params.QueryNodeSubName assert.Equal(t, name, "by-dev-queryNode") - - name = Params.StatsChannelName - assert.Equal(t, name, "by-dev-query-node-stats") - - ch := Params.QueryTimeTickChannelName - assert.Equal(t, ch, "by-dev-queryTimeTick") }) t.Run("test dataCoordConfig", func(t *testing.T) { - Params := GlobalParams.DataCoordCfg - - assert.Equal(t, Params.InsertChannelPrefixName, "by-dev-insert-channel-") - t.Logf("DataCoord insert channel = %s", Params.InsertChannelPrefixName) - - assert.Equal(t, Params.TimeTickChannelName, "by-dev-datacoord-timetick-channel") - t.Logf("DataCoord timetick channel = %s", Params.TimeTickChannelName) - - assert.Equal(t, Params.SegmentInfoChannelName, "by-dev-segment-info-channel") - t.Logf("DataCoord segment info channel = %s", Params.SegmentInfoChannelName) - - assert.Equal(t, Params.DataCoordSubscriptionName, "by-dev-dataCoord") - t.Logf("DataCoord subscription channel = %s", Params.DataCoordSubscriptionName) + //Params := GlobalParams.DataCoordCfg }) t.Run("test dataNodeConfig", func(t *testing.T) { Params := GlobalParams.DataNodeCfg Params.NodeID = 2 - Params.initMsgChannelSubName() + Params.Refresh() id := Params.NodeID log.Println("NodeID:", id) @@ -279,17 +278,9 @@ func TestGlobalParamTable(t *testing.T) { path1 := Params.InsertBinlogRootPath log.Println("InsertBinlogRootPath:", path1) - path1 = Params.ClusterChannelPrefix - assert.Equal(t, path1, "by-dev") - log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix) - - name := Params.TimeTickChannelName - assert.Equal(t, name, "by-dev-datacoord-timetick-channel") - log.Println("TimeTickChannelName:", name) - - name = Params.MsgChannelSubName + name := Params.DataNodeSubName assert.Equal(t, name, "by-dev-dataNode-2") - log.Println("MsgChannelSubName:", name) + log.Println("DataNodeSubName:", name) Params.CreatedTime = time.Now() log.Println("CreatedTime: ", Params.CreatedTime)