From 4e23ed9a18e55cc479a060be1d7661a7606f9ed1 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 10 Sep 2021 13:00:00 +0800 Subject: [PATCH] Revert apply physical channels from pool (d5f7e358 and 3b960969) (#7687) Signed-off-by: yudong.cai --- configs/advanced/channel.yaml | 1 - internal/datanode/data_sync_service.go | 5 +- internal/datanode/flow_graph_dd_node.go | 2 +- .../flow_graph_dmstream_input_node.go | 9 +- .../flow_graph_dmstream_input_node_test.go | 2 +- .../datanode/flow_graph_insert_buffer_node.go | 10 +- .../querynode/flow_graph_filter_dm_node.go | 6 +- internal/querynode/flow_graph_query_node.go | 1 - internal/rootcoord/dml_channels.go | 176 ++++++++---------- internal/rootcoord/dml_channels_test.go | 85 --------- internal/rootcoord/meta_table.go | 1 - internal/rootcoord/param_table.go | 14 -- internal/rootcoord/root_coord.go | 13 +- internal/rootcoord/root_coord_test.go | 7 +- internal/rootcoord/task.go | 8 +- internal/rootcoord/timeticksync.go | 66 +++---- internal/util/timerecord/time_recorder.go | 19 +- 17 files changed, 134 insertions(+), 291 deletions(-) delete mode 100644 internal/rootcoord/dml_channels_test.go diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index ee741eca99..2ce4e5de02 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -14,7 +14,6 @@ msgChannel: chanNamePrefix: rootCoordTimeTick: "rootcoord-timetick" rootCoordStatistics: "rootcoord-statistics" - rootCoordDml: "rootcoord-dml" search: "search" searchResult: "searchResult" proxyTimeTick: "proxyTimeTick" diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index c4b3b130b5..2f28660c46 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -156,11 +157,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.saveBinlog = saveBinlog + pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName()) var dmStreamNode Node = newDmInputNode( dsService.ctx, dsService.msFactory, - vchanInfo.CollectionID, - vchanInfo.GetChannelName(), + pchan, vchanInfo.GetSeekPosition(), ) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 52df9ead7a..49161c7fd0 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -92,7 +92,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { zap.Uint64("Message endts", msg.EndTs()), zap.Uint64("FilterThreshold", FilterThreshold), ) - if ddn.filterFlushedSegmentInsertMessages(imsg) { + if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) { continue } } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 0258db958e..ad7d4b8103 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -13,24 +13,19 @@ package datanode import ( "context" - "strconv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - - // subName should be unique, since pchannelName is shared among several collections - consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) + consumeSubName := Params.MsgChannelSubName insertStream, _ := factory.NewTtMsgStream(ctx) - pchannelName := rootcoord.ToPhysicalChannel(chanName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 3935b15b30..25f598ddb5 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -84,5 +84,5 @@ func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error { func TestNewDmInputNode(t *testing.T) { ctx := context.Background() - newDmInputNode(ctx, &mockMsgStreamFactory{}, 0, "abc_adc", new(internalpb.MsgPosition)) + newDmInputNode(ctx, &mockMsgStreamFactory{}, "abc_adc", new(internalpb.MsgPosition)) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index dfe8bd6e45..ae0b0e839d 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -185,13 +185,9 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(), iMsg.startPositions[0], iMsg.endPositions[0]) if err != nil { - log.Error("add segment wrong", - zap.Int64("segID", currentSegID), - zap.Int64("collID", collID), - zap.Int64("partID", partitionID), - zap.String("chanName", msg.GetChannelID()), - zap.Error(err)) + log.Error("add segment wrong", zap.Error(err)) } + } segNum := uniqueSeg[currentSegID] @@ -204,7 +200,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { err := ibNode.replica.updateStatistics(id, num) if err != nil { - log.Error("update Segment Row number wrong", zap.Int64("segID", id), zap.Error(err)) + log.Error("update Segment Row number wrong", zap.Error(err)) } } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index ba4cd25c67..9886628758 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // check if the collection from message is target collection if msg.CollectionID != fdmNode.collectionID { - //log.Debug("filter invalid insert message, collection is not the target collection", - // zap.Any("collectionID", msg.CollectionID), - // zap.Any("partitionID", msg.PartitionID)) + log.Debug("filter invalid insert message, collection is not the target collection", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) return nil } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index f8d2a84b9f..1592238d3c 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -125,7 +125,6 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS log.Debug("query node flow graph consumes from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), - zap.Any("subName", subName), ) return nil } diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index bb5874bbde..e376ffcba7 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -15,88 +15,87 @@ import ( "fmt" "sync" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" + "go.uber.org/zap" ) type dmlChannels struct { - core *Core - namePrefix string - capacity int64 - refcnt sync.Map - idx *atomic.Int64 - pool sync.Map - dml sync.Map + core *Core + lock sync.RWMutex + dml map[string]msgstream.MsgStream } -func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels { - d := &dmlChannels{ - core: c, - namePrefix: chanNamePrefix, - capacity: chanNum, - refcnt: sync.Map{}, - idx: atomic.NewInt64(0), - pool: sync.Map{}, - dml: sync.Map{}, +func newDMLChannels(c *Core) *dmlChannels { + return &dmlChannels{ + core: c, + lock: sync.RWMutex{}, + dml: make(map[string]msgstream.MsgStream), } - - var i int64 - for i = 0; i < chanNum; i++ { - name := fmt.Sprintf("%s_%d", d.namePrefix, i) - ms, err := c.msFactory.NewMsgStream(c.ctx) - if err != nil { - log.Error("add msgstream failed", zap.String("name", name), zap.Error(err)) - panic("add msgstream failed") - } - ms.AsProducer([]string{name}) - d.pool.Store(name, &ms) - } - log.Debug("init dml channels", zap.Int64("num", chanNum)) - return d -} - -func (d *dmlChannels) GetDmlMsgStreamName() string { - cnt := d.idx.Load() - name := fmt.Sprintf("%s_%d", d.namePrefix, cnt) - d.idx.Store((cnt + 1) % d.capacity) - return name -} - -// ListChannels lists all dml channel names -func (d *dmlChannels) ListChannels() []string { - chanNames := make([]string, 0) - d.refcnt.Range( - func(k, v interface{}) bool { - chanNames = append(chanNames, k.(string)) - return true - }) - return chanNames } // GetNumChannels get current dml channel count func (d *dmlChannels) GetNumChannels() int { - return len(d.ListChannels()) + d.lock.RLock() + defer d.lock.RUnlock() + return len(d.dml) +} + +// ListChannels lists all dml channel names +func (d *dmlChannels) ListChannels() []string { + d.lock.RLock() + defer d.lock.RUnlock() + + ret := make([]string, 0, len(d.dml)) + for n := range d.dml { + ret = append(ret, n) + } + return ret + +} + +// Produce produces msg pack into specified channel +func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + ds, ok := d.dml[name] + if !ok { + return fmt.Errorf("channel %s not exist", name) + } + if err := ds.Produce(pack); err != nil { + return err + } + return nil } // Broadcast broadcasts msg pack into specified channel -func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error { - for _, chanName := range chanNames { - // only in-use chanName exist in refcnt - if _, ok := d.refcnt.Load(chanName); ok { - v, _ := d.pool.Load(chanName) - if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil { - return err - } - } else { - log.Debug("broadcast to old version channel", zap.String("chanName", chanName)) - if ds, ok := d.dml.Load(chanName); ok { - if err := (*(ds.(*msgstream.MsgStream))).Broadcast(pack); err != nil { - return err - } - } +func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + ds, ok := d.dml[name] + if !ok { + return fmt.Errorf("channel %s not exist", name) + } + if err := ds.Broadcast(pack); err != nil { + return err + } + return nil +} + +// BroadcastAll invoke broadcast with provided msg pack in all channels specified +func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + for _, ch := range channels { + ds, ok := d.dml[ch] + if !ok { + return fmt.Errorf("channel %s not exist", ch) + } + if err := ds.Broadcast(pack); err != nil { + return err } } return nil @@ -104,46 +103,33 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err // AddProducerChannels add named channels as producer func (d *dmlChannels) AddProducerChannels(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + for _, name := range names { - if _, ok := d.pool.Load(name); ok { - var cnt int64 - if _, ok := d.refcnt.Load(name); !ok { - cnt = 1 - } else { - v, _ := d.refcnt.Load(name) - cnt = v.(int64) + 1 - } - d.refcnt.Store(name, cnt) - log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt)) - } else { - log.Debug("add old version channel name", zap.String("chanName", name)) + log.Debug("add dml channel", zap.String("channel name", name)) + _, ok := d.dml[name] + if !ok { ms, err := d.core.msFactory.NewMsgStream(d.core.ctx) if err != nil { - log.Error("add msgstream failed", zap.String("name", name), zap.Error(err)) - panic("add msgstream failed") + log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err)) + continue } ms.AsProducer([]string{name}) - d.dml.Store(name, &ms) + d.dml[name] = ms } } } // RemoveProducerChannels removes specified channels func (d *dmlChannels) RemoveProducerChannels(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + for _, name := range names { - if v, ok := d.refcnt.Load(name); ok { - cnt := v.(int64) - if cnt > 1 { - d.refcnt.Store(name, cnt-1) - } else { - d.refcnt.Delete(name) - } - } else { - log.Debug("remove old version channel name", zap.String("chanName", name)) - if ds, ok := d.dml.Load(name); ok { - (*(ds.(*msgstream.MsgStream))).Close() - d.dml.Delete(name) - } + if ds, ok := d.dml[name]; ok { + ds.Close() + delete(d.dml, name) } } } diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go deleted file mode 100644 index cfcc43149e..0000000000 --- a/internal/rootcoord/dml_channels_test.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/stretchr/testify/assert" -) - -func TestDmlChannels(t *testing.T) { - const ( - dmlChanPrefix = "rootcoord-dml" - totalDmlChannelNum = 2 - ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - factory := msgstream.NewPmsFactory() - Params.Init() - - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarAddress, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err := factory.SetParams(m) - assert.Nil(t, err) - - core, err := NewCore(ctx, factory) - assert.Nil(t, err) - - dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum) - chanNames := dml.ListChannels() - assert.Equal(t, 0, len(chanNames)) - - //randStr := funcutil.RandomString(8) - //assert.Panics(t, func() { dml.AddProducerChannels(randStr) }) - // - //err = dml.Broadcast([]string{randStr}, nil) - //assert.NotNil(t, err) - //assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr)) - - randStr := funcutil.RandomString(8) - dml.AddProducerChannels(randStr) - - err = dml.Broadcast([]string{randStr}, nil) - assert.Nil(t, err) - - dml.RemoveProducerChannels(randStr) - - // dml_xxx_0 => {chanName0, chanName2} - // dml_xxx_1 => {chanName1} - chanName0 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName0) - assert.Equal(t, 1, dml.GetNumChannels()) - - chanName1 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName1) - assert.Equal(t, 2, dml.GetNumChannels()) - - chanName2 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName2) - assert.Equal(t, 2, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 2, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName1) - assert.Equal(t, 1, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 0, dml.GetNumChannels()) -} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 8f7d9b804e..f3ac3bbea6 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -174,7 +174,6 @@ func (mt *metaTable) reloadFromKV() error { mt.indexID2Meta[meta.IndexID] = meta } - log.Debug("reload meta table from KV successfully") return nil } diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index bd7a621f85..8b499aacd5 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -69,9 +69,7 @@ func (p *ParamTable) Init() { p.initMsgChannelSubName() p.initTimeTickChannel() p.initStatisticsChannelName() - p.initDmlChannelName() - p.initDmlChannelNum() p.initMaxPartitionNum() p.initMinSegmentSizeToEnableIndex() p.initDefaultPartitionName() @@ -149,18 +147,6 @@ func (p *ParamTable) initStatisticsChannelName() { p.StatisticsChannel = channel } -func (p *ParamTable) initDmlChannelName() { - channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml") - if err != nil { - panic(err) - } - p.DmlChannelName = channel -} - -func (p *ParamTable) initDmlChannelNum() { - p.DmlChannelNum = p.ParseInt64("rootcoord.dmlChannelNum") -} - func (p *ParamTable) initMaxPartitionNum() { p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum") } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 679762746b..f9e61038f7 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -507,7 +507,7 @@ func (c *Core) setMsgStreams() error { CreateCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error { @@ -523,7 +523,7 @@ func (c *Core) setMsgStreams() error { DropCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error { @@ -539,7 +539,7 @@ func (c *Core) setMsgStreams() error { CreatePartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error { @@ -555,7 +555,7 @@ func (c *Core) setMsgStreams() error { DropPartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } return nil @@ -925,12 +925,9 @@ func (c *Core) Init() error { return } - c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum) - - // recover physical channels for all collections + c.dmlChannels = newDMLChannels(c) pc := c.MetaTable.ListCollectionPhysicalChannels() c.dmlChannels.AddProducerChannels(pc...) - log.Debug("recover all physical channels", zap.Any("chanNames", pc)) c.chanTimeTick = newTimeTickSync(c) c.chanTimeTick.AddProxy(c.session) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 11801471ed..9c2c1f58e4 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -316,7 +316,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 vchanNames := make([]string, t.ShardsNum) chanNames := make([]string, t.ShardsNum) for i := int32(0); i < t.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.dmlChannels.GetDmlMsgStreamName(), collID, i) + vchanNames[i] = fmt.Sprintf("%s_%d_%d_v%d", collName, collID, i, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -1613,10 +1613,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - cn0 := core.dmlChannels.GetDmlMsgStreamName() - cn1 := core.dmlChannels.GetDmlMsgStreamName() - cn2 := core.dmlChannels.GetDmlMsgStreamName() - core.dmlChannels.AddProducerChannels(cn0, cn1, cn2) + core.dmlChannels.AddProducerChannels("c0", "c1", "c2") msg0 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index ce19c198a0..96890a29ed 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -133,7 +133,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { vchanNames := make([]string, t.Req.ShardsNum) chanNames := make([]string, t.Req.ShardsNum) for i := int32(0); i < t.Req.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.dmlChannels.GetDmlMsgStreamName(), collID, i) + vchanNames[i] = fmt.Sprintf("%s_%d_%d_v%d", t.Req.CollectionName, collID, i, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -284,8 +284,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) - // send tt into deleted channels to tell data_node to clear flowgragh - t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts) + for _, chanName := range collMeta.PhysicalChannelNames { + // send tt into deleted channels to tell data_node to clear flowgragh + t.core.chanTimeTick.SendChannelTimeTick(chanName, ts) + } // remove dml channel after send dd msg t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 9926ff801c..dbdf559a13 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -22,12 +22,16 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) +type ddlTimetickInfo struct { + ddlMinTs typeutil.Timestamp + ddlTsSet map[typeutil.Timestamp]struct{} +} + type timetickSync struct { core *Core lock sync.Mutex @@ -187,7 +191,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), - // zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) + // zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) t.sendToChannel() return nil @@ -221,53 +225,34 @@ func (t *timetickSync) StartWatch() { for { select { case <-t.core.ctx.Done(): - log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err())) + log.Debug("root coord context done", zap.Error(t.core.ctx.Err())) return - case proxyTimetick, ok := <-t.sendChan: + case ptt, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") return } // reduce each channel to get min timestamp - local := proxyTimetick[t.core.session.ServerID] - if len(local.in.ChannelNames) == 0 { - continue - } - - hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames)) - tr := timerecord.NewTimeRecorder(hdr) - wg := sync.WaitGroup{} - for _, chanName := range local.in.ChannelNames { - wg.Add(1) - go func(chanName string) { - mints := local.getTimetick(chanName) - for _, tt := range proxyTimetick { - ts := tt.getTimetick(chanName) - if ts < mints { - mints = ts - } + mtt := ptt[t.core.session.ServerID] + for _, chanName := range mtt.in.ChannelNames { + mints := mtt.getTimetick(chanName) + for _, tt := range ptt { + ts := tt.getTimetick(chanName) + if ts < mints { + mints = ts } - if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil { - log.Debug("SendTimeTickToChannel fail", zap.Error(err)) - } - wg.Done() - }(chanName) - } - wg.Wait() - span := tr.ElapseSpan() - // rootcoord send tt msg to all channels every 200ms by default - if span.Milliseconds() > 200 { - log.Warn("rootcoord send tt to all channels too slowly", - zap.Int("chanNum", len(local.in.ChannelNames)), - zap.Int64("span", span.Milliseconds())) + } + if err := t.SendChannelTimeTick(chanName, mints); err != nil { + log.Debug("SendChannelTimeTick fail", zap.Error(err)) + } } } } } -// SendTimeTickToChannel send each channel's min timetick to msg stream -func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { +// SendChannelTimeTick send each channel's min timetick to msg stream +func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ BeginTimestamp: ts, @@ -288,14 +273,11 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil { - return err - } - - for _, chanName := range chanNames { + err := t.core.dmlChannels.Broadcast(chanName, &msgPack) + if err == nil { metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } - return nil + return err } // GetProxyNum return the num of detected proxy node diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go index 69253b322c..e22d21ae3b 100644 --- a/internal/util/timerecord/time_recorder.go +++ b/internal/util/timerecord/time_recorder.go @@ -33,30 +33,19 @@ func NewTimeRecorder(header string) *TimeRecorder { } } -func (tr *TimeRecorder) RecordSpan() time.Duration { +// Record calculates the time span from previous Record call +func (tr *TimeRecorder) Record(msg string) time.Duration { curr := time.Now() span := curr.Sub(tr.last) tr.last = curr - return span -} - -func (tr *TimeRecorder) ElapseSpan() time.Duration { - curr := time.Now() - span := curr.Sub(tr.start) - tr.last = curr - return span -} - -// Record calculates the time span from previous Record call -func (tr *TimeRecorder) Record(msg string) time.Duration { - span := tr.RecordSpan() tr.printTimeRecord(msg, span) return span } // Elapse calculates the time span from the beginning of this TimeRecorder func (tr *TimeRecorder) Elapse(msg string) time.Duration { - span := tr.ElapseSpan() + curr := time.Now() + span := curr.Sub(tr.start) tr.printTimeRecord(msg, span) return span }