diff --git a/configs/advanced/root_coord.yaml b/configs/advanced/root_coord.yaml index 889a813eca..47596c4bea 100644 --- a/configs/advanced/root_coord.yaml +++ b/configs/advanced/root_coord.yaml @@ -10,7 +10,6 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. rootcoord: - dmlChannelNum: 64 maxPartitionNum: 4096 minSegmentSizeToEnableIndex: 1024 timeout: 3600 # time out, 5 seconds diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index c107a01f02..fbeb2947bb 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -19,6 +19,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -147,11 +148,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return nil } + pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName()) var dmStreamNode Node = newDmInputNode( dsService.ctx, dsService.msFactory, - vchanInfo.CollectionID, - vchanInfo.GetChannelName(), + pchan, vchanInfo.GetSeekPosition(), ) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 561045de86..e9d6eeb882 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -92,22 +92,15 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } case commonpb.MsgType_Insert: log.Debug("DDNode with insert messages") - imsg := msg.(*msgstream.InsertMsg) if msg.EndTs() < FilterThreshold { log.Info("Filtering Insert Messages", zap.Uint64("Message endts", msg.EndTs()), zap.Uint64("FilterThreshold", FilterThreshold), ) - if ddn.filterFlushedSegmentInsertMessages(imsg) { + if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) { continue } } - if imsg.CollectionID != ddn.collectionID { - //log.Debug("filter invalid InsertMsg, collection mis-match", - // zap.Int64("msg collID", imsg.CollectionID), - // zap.Int64("ddn collID", ddn.collectionID)) - continue - } iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) } } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 0258db958e..ad7d4b8103 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -13,24 +13,19 @@ package datanode import ( "context" - "strconv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - - // subName should be unique, since pchannelName is shared among several collections - consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) + consumeSubName := Params.MsgChannelSubName insertStream, _ := factory.NewTtMsgStream(ctx) - pchannelName := rootcoord.ToPhysicalChannel(chanName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index fd3b0becb8..b52d7ac424 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -184,13 +184,9 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(), iMsg.startPositions[0], iMsg.endPositions[0]) if err != nil { - log.Error("add segment wrong", - zap.Int64("segID", currentSegID), - zap.Int64("collID", collID), - zap.Int64("partID", partitionID), - zap.String("chanName", msg.GetChannelID()), - zap.Error(err)) + log.Error("add segment wrong", zap.Error(err)) } + } segNum := uniqueSeg[currentSegID] @@ -203,7 +199,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { err := ibNode.replica.updateStatistics(id, num) if err != nil { - log.Error("update Segment Row number wrong", zap.Int64("segID", id), zap.Error(err)) + log.Error("update Segment Row number wrong", zap.Error(err)) } } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 24a213a38d..319649fb8e 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // check if the collection from message is target collection if msg.CollectionID != fdmNode.collectionID { - //log.Debug("filter invalid insert message, collection is not the target collection", - // zap.Any("collectionID", msg.CollectionID), - // zap.Any("partitionID", msg.PartitionID)) + log.Debug("filter invalid insert message, collection is not the target collection", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) return nil } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index f8d2a84b9f..1592238d3c 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -125,7 +125,6 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS log.Debug("query node flow graph consumes from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), - zap.Any("subName", subName), ) return nil } diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 66454416a3..e376ffcba7 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -15,80 +15,86 @@ import ( "fmt" "sync" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/util/funcutil" + "go.uber.org/zap" ) type dmlChannels struct { - core *Core - namePrefix string - capacity int64 - refcnt sync.Map - idx *atomic.Int64 - pool sync.Map + core *Core + lock sync.RWMutex + dml map[string]msgstream.MsgStream } -func newDmlChannels(c *Core, chanNum int64) *dmlChannels { - d := &dmlChannels{ - core: c, - namePrefix: fmt.Sprintf("dml_%s", funcutil.RandomString(8)), - capacity: chanNum, - refcnt: sync.Map{}, - idx: atomic.NewInt64(0), - pool: sync.Map{}, +func newDMLChannels(c *Core) *dmlChannels { + return &dmlChannels{ + core: c, + lock: sync.RWMutex{}, + dml: make(map[string]msgstream.MsgStream), } - - var i int64 - for i = 0; i < chanNum; i++ { - name := fmt.Sprintf("%s_%d", d.namePrefix, i) - ms, err := c.msFactory.NewMsgStream(c.ctx) - if err != nil { - log.Error("add msgstream failed", zap.String("name", name), zap.Error(err)) - panic("add msgstream failed") - } - ms.AsProducer([]string{name}) - d.pool.Store(name, &ms) - } - log.Debug("init dml channels", zap.Int64("num", chanNum)) - return d -} - -func (d *dmlChannels) GetDmlMsgStreamName() string { - cnt := d.idx.Load() - name := fmt.Sprintf("%s_%d", d.namePrefix, cnt) - d.idx.Store((cnt + 1) % d.capacity) - return name -} - -// ListChannels lists all dml channel names -func (d *dmlChannels) ListChannels() []string { - chanNames := make([]string, 0) - d.refcnt.Range( - func(k, v interface{}) bool { - chanNames = append(chanNames, k.(string)) - return true - }) - return chanNames } // GetNumChannels get current dml channel count func (d *dmlChannels) GetNumChannels() int { - return len(d.ListChannels()) + d.lock.RLock() + defer d.lock.RUnlock() + return len(d.dml) +} + +// ListChannels lists all dml channel names +func (d *dmlChannels) ListChannels() []string { + d.lock.RLock() + defer d.lock.RUnlock() + + ret := make([]string, 0, len(d.dml)) + for n := range d.dml { + ret = append(ret, n) + } + return ret + +} + +// Produce produces msg pack into specified channel +func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + ds, ok := d.dml[name] + if !ok { + return fmt.Errorf("channel %s not exist", name) + } + if err := ds.Produce(pack); err != nil { + return err + } + return nil } // Broadcast broadcasts msg pack into specified channel -func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error { - for _, chanName := range chanNames { - // only in-use chanName exist in refcnt - if _, ok := d.refcnt.Load(chanName); !ok { - return fmt.Errorf("channel %s not exist", chanName) +func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + ds, ok := d.dml[name] + if !ok { + return fmt.Errorf("channel %s not exist", name) + } + if err := ds.Broadcast(pack); err != nil { + return err + } + return nil +} + +// BroadcastAll invoke broadcast with provided msg pack in all channels specified +func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + for _, ch := range channels { + ds, ok := d.dml[ch] + if !ok { + return fmt.Errorf("channel %s not exist", ch) } - v, _ := d.pool.Load(chanName) - if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil { + if err := ds.Broadcast(pack); err != nil { return err } } @@ -97,35 +103,33 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err // AddProducerChannels add named channels as producer func (d *dmlChannels) AddProducerChannels(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + for _, name := range names { - if _, ok := d.pool.Load(name); !ok { - log.Error("invalid channel name", zap.String("chanName", name)) - panic("invalid channel name") - } else { - var cnt int64 - if _, ok := d.refcnt.Load(name); !ok { - cnt = 1 - } else { - v, _ := d.refcnt.Load(name) - cnt = v.(int64) + 1 + log.Debug("add dml channel", zap.String("channel name", name)) + _, ok := d.dml[name] + if !ok { + ms, err := d.core.msFactory.NewMsgStream(d.core.ctx) + if err != nil { + log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err)) + continue } - d.refcnt.Store(name, cnt) - log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt)) + ms.AsProducer([]string{name}) + d.dml[name] = ms } } } // RemoveProducerChannels removes specified channels func (d *dmlChannels) RemoveProducerChannels(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + for _, name := range names { - v, ok := d.refcnt.Load(name) - if ok { - cnt := v.(int64) - if cnt > 1 { - d.refcnt.Store(name, cnt-1) - } else { - d.refcnt.Delete(name) - } + if ds, ok := d.dml[name]; ok { + ds.Close() + delete(d.dml, name) } } } diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go deleted file mode 100644 index 0c44e5eaa3..0000000000 --- a/internal/rootcoord/dml_channels_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - "testing" - - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/stretchr/testify/assert" -) - -func TestDmlChannels(t *testing.T) { - const ( - totalDmlChannelNum = 2 - ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - factory := msgstream.NewPmsFactory() - Params.Init() - - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarAddress, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err := factory.SetParams(m) - assert.Nil(t, err) - - core, err := NewCore(ctx, factory) - assert.Nil(t, err) - - dml := newDmlChannels(core, totalDmlChannelNum) - chanNames := dml.ListChannels() - assert.Equal(t, 0, len(chanNames)) - - randStr := funcutil.RandomString(8) - assert.Panics(t, func() { dml.AddProducerChannels(randStr) }) - - err = dml.Broadcast([]string{randStr}, nil) - assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr)) - - // dml_xxx_0 => {chanName0, chanName2} - // dml_xxx_1 => {chanName1} - chanName0 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName0) - assert.Equal(t, 1, dml.GetNumChannels()) - - chanName1 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName1) - assert.Equal(t, 2, dml.GetNumChannels()) - - chanName2 := dml.GetDmlMsgStreamName() - dml.AddProducerChannels(chanName2) - assert.Equal(t, 2, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 2, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName1) - assert.Equal(t, 1, dml.GetNumChannels()) - - dml.RemoveProducerChannels(chanName0) - assert.Equal(t, 0, dml.GetNumChannels()) -} diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 673ba117b3..12537e437d 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -40,7 +40,6 @@ type ParamTable struct { TimeTickChannel string StatisticsChannel string - DmlChannelNum int64 MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string @@ -73,7 +72,6 @@ func (p *ParamTable) Init() { p.initTimeTickChannel() p.initStatisticsChannelName() - p.initDmlChannelNum() p.initMaxPartitionNum() p.initMinSegmentSizeToEnableIndex() p.initDefaultPartitionName() @@ -163,10 +161,6 @@ func (p *ParamTable) initStatisticsChannelName() { p.StatisticsChannel = channel } -func (p *ParamTable) initDmlChannelNum() { - p.DmlChannelNum = p.ParseInt64("rootcoord.dmlChannelNum") -} - func (p *ParamTable) initMaxPartitionNum() { p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum") } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index f93a25e842..9a4ac02465 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -495,7 +495,7 @@ func (c *Core) setMsgStreams() error { CreateCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error { @@ -511,7 +511,7 @@ func (c *Core) setMsgStreams() error { DropCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error { @@ -527,7 +527,7 @@ func (c *Core) setMsgStreams() error { CreatePartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error { @@ -543,7 +543,7 @@ func (c *Core) setMsgStreams() error { DropPartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.Broadcast(channelNames, &msgPack) + return c.dmlChannels.BroadcastAll(channelNames, &msgPack) } return nil @@ -959,7 +959,7 @@ func (c *Core) Init() error { return } - c.dmlChannels = newDmlChannels(c, Params.DmlChannelNum) + c.dmlChannels = newDMLChannels(c) pc := c.MetaTable.ListCollectionPhysicalChannels() c.dmlChannels.AddProducerChannels(pc...) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 70f1772751..b10654cae5 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -410,7 +410,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - assert.Equal(t, 2, core.dmlChannels.GetNumChannels()) + assert.Equal(t, 2, len(core.dmlChannels.dml)) pChan := core.MetaTable.ListCollectionPhysicalChannels() dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) @@ -1431,10 +1431,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - cn0 := core.dmlChannels.GetDmlMsgStreamName() - cn1 := core.dmlChannels.GetDmlMsgStreamName() - cn2 := core.dmlChannels.GetDmlMsgStreamName() - core.dmlChannels.AddProducerChannels(cn0, cn1, cn2) + core.dmlChannels.AddProducerChannels("c0", "c1", "c2") msg0 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index ad3df0e581..102fa7fb77 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -136,7 +136,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { vchanNames := make([]string, t.Req.ShardsNum) chanNames := make([]string, t.Req.ShardsNum) for i := int32(0); i < t.Req.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%dv", t.core.dmlChannels.GetDmlMsgStreamName(), collID) + vchanNames[i] = fmt.Sprintf("%s_%d_%d_v", t.Req.CollectionName, collID, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -286,8 +286,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) - // send tt into deleted channels to tell data_node to clear flowgragh - t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts) + for _, chanName := range collMeta.PhysicalChannelNames { + // send tt into deleted channels to tell data_node to clear flowgragh + t.core.chanTimeTick.SendChannelTimeTick(chanName, ts) + } // remove dml channel after send dd msg t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 9926ff801c..dbdf559a13 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -22,12 +22,16 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) +type ddlTimetickInfo struct { + ddlMinTs typeutil.Timestamp + ddlTsSet map[typeutil.Timestamp]struct{} +} + type timetickSync struct { core *Core lock sync.Mutex @@ -187,7 +191,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), - // zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) + // zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) t.sendToChannel() return nil @@ -221,53 +225,34 @@ func (t *timetickSync) StartWatch() { for { select { case <-t.core.ctx.Done(): - log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err())) + log.Debug("root coord context done", zap.Error(t.core.ctx.Err())) return - case proxyTimetick, ok := <-t.sendChan: + case ptt, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") return } // reduce each channel to get min timestamp - local := proxyTimetick[t.core.session.ServerID] - if len(local.in.ChannelNames) == 0 { - continue - } - - hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames)) - tr := timerecord.NewTimeRecorder(hdr) - wg := sync.WaitGroup{} - for _, chanName := range local.in.ChannelNames { - wg.Add(1) - go func(chanName string) { - mints := local.getTimetick(chanName) - for _, tt := range proxyTimetick { - ts := tt.getTimetick(chanName) - if ts < mints { - mints = ts - } + mtt := ptt[t.core.session.ServerID] + for _, chanName := range mtt.in.ChannelNames { + mints := mtt.getTimetick(chanName) + for _, tt := range ptt { + ts := tt.getTimetick(chanName) + if ts < mints { + mints = ts } - if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil { - log.Debug("SendTimeTickToChannel fail", zap.Error(err)) - } - wg.Done() - }(chanName) - } - wg.Wait() - span := tr.ElapseSpan() - // rootcoord send tt msg to all channels every 200ms by default - if span.Milliseconds() > 200 { - log.Warn("rootcoord send tt to all channels too slowly", - zap.Int("chanNum", len(local.in.ChannelNames)), - zap.Int64("span", span.Milliseconds())) + } + if err := t.SendChannelTimeTick(chanName, mints); err != nil { + log.Debug("SendChannelTimeTick fail", zap.Error(err)) + } } } } } -// SendTimeTickToChannel send each channel's min timetick to msg stream -func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { +// SendChannelTimeTick send each channel's min timetick to msg stream +func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ BeginTimestamp: ts, @@ -288,14 +273,11 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil { - return err - } - - for _, chanName := range chanNames { + err := t.core.dmlChannels.Broadcast(chanName, &msgPack) + if err == nil { metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } - return nil + return err } // GetProxyNum return the num of detected proxy node diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go index 69253b322c..e22d21ae3b 100644 --- a/internal/util/timerecord/time_recorder.go +++ b/internal/util/timerecord/time_recorder.go @@ -33,30 +33,19 @@ func NewTimeRecorder(header string) *TimeRecorder { } } -func (tr *TimeRecorder) RecordSpan() time.Duration { +// Record calculates the time span from previous Record call +func (tr *TimeRecorder) Record(msg string) time.Duration { curr := time.Now() span := curr.Sub(tr.last) tr.last = curr - return span -} - -func (tr *TimeRecorder) ElapseSpan() time.Duration { - curr := time.Now() - span := curr.Sub(tr.start) - tr.last = curr - return span -} - -// Record calculates the time span from previous Record call -func (tr *TimeRecorder) Record(msg string) time.Duration { - span := tr.RecordSpan() tr.printTimeRecord(msg, span) return span } // Elapse calculates the time span from the beginning of this TimeRecorder func (tr *TimeRecorder) Elapse(msg string) time.Duration { - span := tr.ElapseSpan() + curr := time.Now() + span := curr.Sub(tr.start) tr.printTimeRecord(msg, span) return span }