diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index 63cc9f1cd3..b99fb4eecf 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -15,16 +15,12 @@ msgChannel: dataDefinition: "data-definition" masterTimeTick: "master-timetick" masterStatistics: "master-statistics" - insert: "insert" - delete: "delete" search: "search" searchResult: "searchResult" k2s: "k2s" proxyTimeTick: "proxyTimeTick" proxyServiceTimeTick: "proxyServiceTimeTick" - dataNodeTimeTick: "dataNodeTimeTick" queryTimeTick: "queryTimeTick" - dataNodeSegStatistics: "dataNodeSegStatistics" queryNodeStats: "query-node-stats" # cmd for loadIndex, flush, etc... cmd: "cmd" @@ -43,9 +39,6 @@ msgChannel: # default channel range [0, 1) channelRange: - insert: [0, 2] - delete: [0, 1] - dataDefinition: [0,1] k2s: [0, 1] search: [0, 1] searchResult: [0, 1] diff --git a/internal/dataservice/cluster_test.go b/internal/dataservice/cluster_test.go index 8549845153..afbe23e80d 100644 --- a/internal/dataservice/cluster_test.go +++ b/internal/dataservice/cluster_test.go @@ -20,10 +20,10 @@ import ( func TestDataNodeClusterRegister(t *testing.T) { Params.Init() - Params.DataNodeNum = 3 cluster := newDataNodeCluster() - ids := make([]int64, 0, Params.DataNodeNum) - for i := 0; i < Params.DataNodeNum; i++ { + dataNodeNum := 3 + ids := make([]int64, 0, dataNodeNum) + for i := 0; i < dataNodeNum; i++ { c := newMockDataNodeClient(int64(i)) err := c.Init() assert.Nil(t, err) @@ -40,18 +40,18 @@ func TestDataNodeClusterRegister(t *testing.T) { }) ids = append(ids, int64(i)) } - assert.EqualValues(t, Params.DataNodeNum, cluster.GetNumOfNodes()) + assert.EqualValues(t, dataNodeNum, cluster.GetNumOfNodes()) assert.EqualValues(t, ids, cluster.GetNodeIDs()) states, err := cluster.GetDataNodeStates(context.TODO()) assert.Nil(t, err) - assert.EqualValues(t, Params.DataNodeNum, len(states)) + assert.EqualValues(t, dataNodeNum, len(states)) for _, s := range states { assert.EqualValues(t, internalpb.StateCode_Healthy, s.StateCode) } cluster.ShutDownClients() states, err = cluster.GetDataNodeStates(context.TODO()) assert.Nil(t, err) - assert.EqualValues(t, Params.DataNodeNum, len(states)) + assert.EqualValues(t, dataNodeNum, len(states)) for _, s := range states { assert.EqualValues(t, internalpb.StateCode_Abnormal, s.StateCode) } @@ -59,7 +59,7 @@ func TestDataNodeClusterRegister(t *testing.T) { func TestWatchChannels(t *testing.T) { Params.Init() - Params.DataNodeNum = 3 + dataNodeNum := 3 cases := []struct { collectionID UniqueID channels []string @@ -73,7 +73,7 @@ func TestWatchChannels(t *testing.T) { cluster := newDataNodeCluster() for _, c := range cases { - for i := 0; i < Params.DataNodeNum; i++ { + for i := 0; i < dataNodeNum; i++ { c := newMockDataNodeClient(int64(i)) err := c.Init() assert.Nil(t, err) diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index f27da8de53..5044e2836f 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -42,7 +42,7 @@ func TestRegisterNode(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, Params.DataNodeNum, svr.cluster.GetNumOfNodes()) + assert.EqualValues(t, 1, svr.cluster.GetNumOfNodes()) assert.EqualValues(t, []int64{1000}, svr.cluster.GetNodeIDs()) }) @@ -342,7 +342,6 @@ func TestGetSegmentStates(t *testing.T) { func newTestServer(t *testing.T) *Server { Params.Init() - Params.DataNodeNum = 1 var err error factory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index f476280a3a..8bbb4f7d6f 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -49,14 +49,11 @@ type ParamTable struct { QueryNodeIDList []UniqueID ProxyID UniqueID TimeTickInterval time.Duration - InsertChannelNames []string - DeleteChannelNames []string K2SChannelNames []string SearchChannelNames []string SearchResultChannelNames []string ProxySubName string ProxyTimeTickChannelNames []string - DataDefinitionChannelNames []string MsgStreamInsertBufSize int64 MsgStreamSearchBufSize int64 MsgStreamSearchResultBufSize int64 @@ -148,14 +145,11 @@ func (pt *ParamTable) initParams() { pt.initQueryNodeIDList() pt.initQueryNodeNum() pt.initTimeTickInterval() - pt.initInsertChannelNames() - pt.initDeleteChannelNames() pt.initK2SChannelNames() pt.initSearchChannelNames() pt.initSearchResultChannelNames() pt.initProxySubName() pt.initProxyTimeTickChannelNames() - pt.initDataDefinitionChannelNames() pt.initMsgStreamInsertBufSize() pt.initMsgStreamSearchBufSize() pt.initMsgStreamSearchResultBufSize() @@ -213,43 +207,6 @@ func (pt *ParamTable) initTimeTickInterval() { pt.TimeTickInterval = time.Duration(interval) * time.Millisecond } -func (pt *ParamTable) initInsertChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.insert") - if err != nil { - panic(err) - } - prefix += "-" - iRangeStr, err := pt.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - - pt.InsertChannelNames = ret -} - -func (pt *ParamTable) initDeleteChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.delete") - if err != nil { - panic(err) - } - prefix += "-" - dRangeStr, err := pt.Load("msgChannel.channelRange.delete") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(dRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - pt.DeleteChannelNames = ret -} - func (pt *ParamTable) initK2SChannelNames() { prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s") if err != nil { @@ -321,15 +278,6 @@ func (pt *ParamTable) initProxyTimeTickChannelNames() { pt.ProxyTimeTickChannelNames = []string{prefix} } -func (pt *ParamTable) initDataDefinitionChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.dataDefinition") - if err != nil { - panic(err) - } - prefix += "-0" - pt.DataDefinitionChannelNames = []string{prefix} -} - func (pt *ParamTable) initMsgStreamInsertBufSize() { pt.MsgStreamInsertBufSize = pt.ParseInt64("proxyNode.msgStream.insert.bufSize") } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index b7c47fa372..b8887e79b7 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -59,9 +59,8 @@ type ProxyNode struct { tsoAllocator *allocator.TimestampAllocator segAssigner *SegIDAssigner - manipulationMsgStream msgstream.MsgStream - queryMsgStream msgstream.MsgStream - msFactory msgstream.Factory + queryMsgStream msgstream.MsgStream + msFactory msgstream.Factory // Add callback functions at different stages startCallbacks []func() @@ -194,15 +193,6 @@ func (node *ProxyNode) Init() error { node.segAssigner = segAssigner node.segAssigner.PeerID = Params.ProxyID - node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) - node.manipulationMsgStream.AsProducer(Params.InsertChannelNames) - log.Debug("proxynode", zap.Strings("proxynode AsProducer", Params.InsertChannelNames)) - repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { - return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) - } - node.manipulationMsgStream.SetRepackFunc(repackFunc) - log.Debug("create manipulation message stream ...") - node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory) if err != nil { return err @@ -223,9 +213,6 @@ func (node *ProxyNode) Start() error { initGlobalInsertChannelsMap(node) log.Debug("init global insert channels map ...") - node.manipulationMsgStream.Start() - log.Debug("start manipulation message stream ...") - node.queryMsgStream.Start() log.Debug("start query message stream ...") @@ -265,7 +252,6 @@ func (node *ProxyNode) Stop() error { node.idAllocator.Close() node.segAssigner.Close() node.sched.Close() - node.manipulationMsgStream.Close() node.queryMsgStream.Close() node.tick.Close() diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 2275323531..dd6252b070 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -51,12 +51,6 @@ func TestDataSyncService_Start(t *testing.T) { } records = append(records, blob) } - - timeRange := TimeRange{ - timestampMin: 0, - timestampMax: math.MaxUint64, - } - // messages generate insertMessages := make([]msgstream.TsMsg, 0) for i := 0; i < msgLength; i++ { @@ -88,12 +82,6 @@ func TestDataSyncService_Start(t *testing.T) { insertMessages = append(insertMessages, msg) } - msgPack := msgstream.MsgPack{ - BeginTs: timeRange.timestampMin, - EndTs: timeRange.timestampMax, - Msgs: insertMessages, - } - // generate timeTick timeTickMsgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ @@ -117,7 +105,6 @@ func TestDataSyncService_Start(t *testing.T) { // pulsar produce const receiveBufSize = 1024 - insertChannels := Params.InsertChannelNames pulsarURL := Params.PulsarAddress msFactory := msgstream.NewPmsFactory() @@ -128,18 +115,6 @@ func TestDataSyncService_Start(t *testing.T) { err := msFactory.SetParams(m) assert.Nil(t, err) - insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) - insertStream.AsProducer(insertChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - - err = insertMsgStream.Produce(&msgPack) - assert.NoError(t, err) - - err = insertMsgStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) - // dataSync node.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID) go node.dataSyncServices[collectionID].start() diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 8aa888b52b..be5327e26c 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -958,11 +958,6 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, binary.LittleEndian.PutUint32(bs, 1) rawData = append(rawData, bs...) - timeRange := TimeRange{ - timestampMin: 0, - timestampMax: math.MaxUint64, - } - // messages generate insertMessages := make([]msgstream.TsMsg, 0) for i := 0; i < msgLength; i++ { @@ -993,12 +988,6 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, insertMessages = append(insertMessages, msg) } - msgPack := msgstream.MsgPack{ - BeginTs: timeRange.timestampMin, - EndTs: timeRange.timestampMax, - Msgs: insertMessages, - } - // generate timeTick timeTickMsgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ @@ -1022,8 +1011,6 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, // pulsar produce const receiveBufSize = 1024 - insertChannels := Params.InsertChannelNames - ddChannels := Params.DDChannelNames msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ @@ -1035,93 +1022,6 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, return err } - insertStream, _ := msFactory.NewMsgStream(ctx) - insertStream.AsProducer(insertChannels) - insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName) - - ddStream, _ := msFactory.NewMsgStream(ctx) - ddStream.AsProducer(ddChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - - err = insertMsgStream.Produce(&msgPack) - if err != nil { - return err - } - - err = insertMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - err = ddMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - - return nil -} - -func sentTimeTick(ctx context.Context) error { - timeTickMsgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 1500, - EndTimestamp: 2000, - HashValues: []uint32{0}, - } - timeTickResult := internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: math.MaxUint64, - SourceID: 0, - }, - } - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) - - // pulsar produce - const receiveBufSize = 1024 - insertChannels := Params.InsertChannelNames - ddChannels := Params.DDChannelNames - - msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": receiveBufSize, - "pulsarAddress": Params.PulsarAddress, - "pulsarBufSize": 1024} - err := msFactory.SetParams(m) - if err != nil { - return err - } - - insertStream, _ := msFactory.NewMsgStream(ctx) - insertStream.AsProducer(insertChannels) - insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName) - - ddStream, _ := msFactory.NewMsgStream(ctx) - ddStream.AsProducer(ddChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - - err = insertMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - err = ddMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } return nil } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 4acbcf0844..6eb9485143 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -47,13 +47,10 @@ type ParamTable struct { MinioBucketName string // dm - InsertChannelNames []string - InsertChannelRange []int InsertReceiveBufSize int64 InsertPulsarBufSize int64 // dd - DDChannelNames []string DDReceiveBufSize int64 DDPulsarBufSize int64 @@ -124,12 +121,9 @@ func (p *ParamTable) Init() { p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() - p.initInsertChannelNames() - p.initInsertChannelRange() p.initInsertReceiveBufSize() p.initInsertPulsarBufSize() - p.initDDChannelNames() p.initDDReceiveBufSize() p.initDDPulsarBufSize() @@ -229,14 +223,6 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } -func (p *ParamTable) initInsertChannelRange() { - insertChannelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",") -} - // advanced params // stats func (p *ParamTable) initStatsPublishInterval() { @@ -325,32 +311,6 @@ func (p *ParamTable) initGracefulTime() { p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") } -func (p *ParamTable) initInsertChannelNames() { - - prefix, err := p.Load("msgChannel.chanNamePrefix.insert") - if err != nil { - log.Error(err.Error()) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - sep := len(channelIDs) / p.QueryNodeNum - index := p.SliceIndex - if index == -1 { - panic("queryNodeID not Match with Config") - } - start := index * sep - p.InsertChannelNames = ret[start : start+sep] -} - func (p *ParamTable) initSearchChannelNames() { prefix, err := p.Load("msgChannel.chanNamePrefix.search") if err != nil { @@ -412,24 +372,6 @@ func (p *ParamTable) initStatsChannelName() { p.StatsChannelName = channels } -func (p *ParamTable) initDDChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") - if err != nil { - panic(err) - } - //prefix += "-" - //iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition") - //if err != nil { - // panic(err) - //} - //channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - //var ret []string - //for _, ID := range channelIDs { - // ret = append(ret, prefix+strconv.Itoa(ID)) - //} - p.DDChannelNames = []string{prefix} -} - func (p *ParamTable) initSliceIndex() { queryNodeID := p.QueryNodeID queryNodeIDList := p.QueryNodeIDList() diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index ff1e65322c..1509c54d23 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -66,11 +66,6 @@ func TestParamTable_minio(t *testing.T) { }) } -func TestParamTable_insertChannelRange(t *testing.T) { - channelRange := Params.InsertChannelRange - assert.Equal(t, 2, len(channelRange)) -} - func TestParamTable_statsServiceTimeInterval(t *testing.T) { interval := Params.StatsPublishInterval assert.Equal(t, 1000, interval) @@ -126,17 +121,6 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { assert.Equal(t, int32(1024), maxParallelism) } -func TestParamTable_insertChannelNames(t *testing.T) { - names := Params.InsertChannelNames - channelRange := Params.InsertChannelRange - num := channelRange[1] - channelRange[0] - num = num / Params.QueryNodeNum - assert.Equal(t, num, len(names)) - start := num * Params.SliceIndex - contains := strings.Contains(names[0], fmt.Sprintf("insert-%d", channelRange[start])) - assert.Equal(t, contains, true) -} - func TestParamTable_searchChannelNames(t *testing.T) { names := Params.SearchChannelNames assert.Equal(t, len(names), 1) @@ -165,9 +149,3 @@ func TestParamTable_metaRootPath(t *testing.T) { path := Params.MetaRootPath fmt.Println(path) } - -func TestParamTable_ddChannelName(t *testing.T) { - names := Params.DDChannelNames - contains := strings.Contains(names[0], "data-definition") - assert.Equal(t, contains, true) -} diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 84829aca71..26b12eaa1b 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -196,8 +196,6 @@ func makeNewChannelNames(names []string, suffix string) []string { func refreshChannelNames() { suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10) - Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) - Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix) Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix) Params.StatsChannelName = Params.StatsChannelName + suffix diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index e1747beda1..7f59529c1e 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -138,44 +138,6 @@ func sendSearchRequest(ctx context.Context, DIM int) error { return err } -func sendTimeTick(ctx context.Context) error { - // init message stream - msFactory, err := newMessageStreamFactory() - if err != nil { - return err - } - - // generate timeTick - timeTickMsgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: Timestamp(20), - EndTimestamp: Timestamp(20), - HashValues: []uint32{0}, - } - timeTickResult := internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: Timestamp(20), - SourceID: 0, - }, - } - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) - - // produce timeTick message - insertChannels := Params.InsertChannelNames - insertStream, _ := msFactory.NewMsgStream(ctx) - insertStream.AsProducer(insertChannels) - insertStream.Start() - - err = insertStream.Broadcast(&timeTickMsgPack) - return err -} - func TestSearch_Search(t *testing.T) { const N = 10000 const DIM = 16 diff --git a/internal/util/paramtable/paramtable_test.go b/internal/util/paramtable/paramtable_test.go index c8caa90359..e00ff62352 100644 --- a/internal/util/paramtable/paramtable_test.go +++ b/internal/util/paramtable/paramtable_test.go @@ -110,7 +110,4 @@ func TestGlobalParamsTable_LoadYaml(t *testing.T) { assert.Nil(t, err) _, err = Params.Load("pulsar.port") assert.Nil(t, err) - _, err = Params.Load("msgChannel.channelRange.insert") - assert.Nil(t, err) - }