diff --git a/internal/metrics/datanode_metrics.go b/internal/metrics/datanode_metrics.go index 13f0e74ca7..0e351d2ed7 100644 --- a/internal/metrics/datanode_metrics.go +++ b/internal/metrics/datanode_metrics.go @@ -197,6 +197,17 @@ var ( Help: "forward delete message time taken", Buckets: buckets, // unit: ms }, []string{nodeIDLabelName}) + + DataNodeMsgDispatcherTtLag = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "msg_dispatcher_tt_lag_ms", + Help: "time.Now() sub dispatcher's current consume time", + }, []string{ + nodeIDLabelName, + channelNameLabelName, + }) ) // RegisterDataNode registers DataNode metrics @@ -217,6 +228,7 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeProduceTimeTickLag) registry.MustRegister(DataNodeConsumeBytesCount) registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken) + registry.MustRegister(DataNodeMsgDispatcherTtLag) } func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) { diff --git a/internal/metrics/querynode_metrics.go b/internal/metrics/querynode_metrics.go index 3f863f4bb5..e8b403cc7f 100644 --- a/internal/metrics/querynode_metrics.go +++ b/internal/metrics/querynode_metrics.go @@ -343,6 +343,17 @@ var ( Name: "execute_bytes_counter", Help: "", }, []string{nodeIDLabelName, msgTypeLabelName}) + + QueryNodeMsgDispatcherTtLag = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "msg_dispatcher_tt_lag_ms", + Help: "time.Now() sub dispatcher's current consume time", + }, []string{ + nodeIDLabelName, + channelNameLabelName, + }) ) // RegisterQueryNode registers QueryNode metrics @@ -376,6 +387,7 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeExecuteCounter) registry.MustRegister(QueryNodeConsumerMsgCount) registry.MustRegister(QueryNodeConsumeTimeTickLag) + registry.MustRegister(QueryNodeMsgDispatcherTtLag) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { diff --git a/internal/mq/msgdispatcher/client.go b/internal/mq/msgdispatcher/client.go index 3f00ec332b..0396fd7c1e 100644 --- a/internal/mq/msgdispatcher/client.go +++ b/internal/mq/msgdispatcher/client.go @@ -17,6 +17,8 @@ package msgdispatcher import ( + "sync" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" @@ -24,7 +26,6 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/typeutil" ) type ( @@ -41,17 +42,18 @@ type Client interface { var _ Client = (*client)(nil) type client struct { - role string - nodeID int64 - managers *typeutil.ConcurrentMap[string, DispatcherManager] // pchannel -> DispatcherManager - factory msgstream.Factory + role string + nodeID int64 + managerMu sync.Mutex + managers map[string]DispatcherManager // pchannel -> DispatcherManager + factory msgstream.Factory } func NewClient(factory msgstream.Factory, role string, nodeID int64) Client { return &client{ role: role, nodeID: nodeID, - managers: typeutil.NewConcurrentMap[string, DispatcherManager](), + managers: make(map[string]DispatcherManager), factory: factory, } } @@ -60,18 +62,20 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) pchannel := funcutil.ToPhysicalChannel(vchannel) - managers, ok := c.managers.Get(pchannel) + c.managerMu.Lock() + defer c.managerMu.Unlock() + manager, ok := c.managers[pchannel] if !ok { - managers = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory) - go managers.Run() - old, exist := c.managers.GetOrInsert(pchannel, managers) - if exist { - managers.Close() - managers = old - } + manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory) + c.managers[pchannel] = manager + go manager.Run() } - ch, err := managers.Add(vchannel, pos, subPos) + ch, err := manager.Add(vchannel, pos, subPos) if err != nil { + if manager.Num() == 0 { + manager.Close() + delete(c.managers, pchannel) + } log.Error("register failed", zap.Error(err)) return nil, err } @@ -81,11 +85,13 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg func (c *client) Deregister(vchannel string) { pchannel := funcutil.ToPhysicalChannel(vchannel) - if managers, ok := c.managers.Get(pchannel); ok { - managers.Remove(vchannel) - if managers.Num() == 0 { - managers.Close() - c.managers.GetAndRemove(pchannel) + c.managerMu.Lock() + defer c.managerMu.Unlock() + if manager, ok := c.managers[pchannel]; ok { + manager.Remove(vchannel) + if manager.Num() == 0 { + manager.Close() + delete(c.managers, pchannel) } log.Info("deregister done", zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) diff --git a/internal/mq/msgdispatcher/client_test.go b/internal/mq/msgdispatcher/client_test.go index 2388eaac4e..d052ca7560 100644 --- a/internal/mq/msgdispatcher/client_test.go +++ b/internal/mq/msgdispatcher/client_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -39,20 +40,25 @@ func TestClient(t *testing.T) { } func TestClient_Concurrency(t *testing.T) { - client := NewClient(newMockFactory(), typeutil.ProxyRole, 1) - assert.NotNil(t, client) + client1 := NewClient(newMockFactory(), typeutil.ProxyRole, 1) + assert.NotNil(t, client1) wg := &sync.WaitGroup{} - for i := 0; i < 100; i++ { + const total = 100 + deregisterCount := atomic.NewInt32(0) + for i := 0; i < total; i++ { vchannel := fmt.Sprintf("mock-vchannel-%d-%d", i, rand.Int()) wg.Add(1) go func() { - for j := 0; j < 10; j++ { - _, err := client.Register(vchannel, nil, mqwrapper.SubscriptionPositionUnknown) - assert.NoError(t, err) - client.Deregister(vchannel) + _, err := client1.Register(vchannel, nil, mqwrapper.SubscriptionPositionUnknown) + assert.NoError(t, err) + for j := 0; j < rand.Intn(2); j++ { + client1.Deregister(vchannel) + deregisterCount.Inc() } wg.Done() }() } wg.Wait() + expected := int(total - deregisterCount.Load()) + assert.Equal(t, expected, len(client1.(*client).managers)) } diff --git a/internal/mq/msgdispatcher/dispatcher.go b/internal/mq/msgdispatcher/dispatcher.go index 2f1dcc12b9..d6be51b404 100644 --- a/internal/mq/msgdispatcher/dispatcher.go +++ b/internal/mq/msgdispatcher/dispatcher.go @@ -25,6 +25,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -54,6 +55,9 @@ func (s signal) String() string { } type Dispatcher struct { + ctx context.Context + cancel context.CancelFunc + done chan struct{} wg sync.WaitGroup once sync.Once @@ -158,16 +162,20 @@ func (d *Dispatcher) Handle(signal signal) { log.Info("get signal") switch signal { case start: + d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(1) go d.work() case pause: d.done <- struct{}{} + d.cancel() d.wg.Wait() case resume: + d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(1) go d.work() case terminate: d.done <- struct{}{} + d.cancel() d.wg.Wait() d.once.Do(func() { d.stream.Close() @@ -192,39 +200,24 @@ func (d *Dispatcher) work() { } d.curTs.Store(pack.EndPositions[0].GetTimestamp()) - // init packs for all targets, even though there's no msg in pack, - // but we still need to dispatch time ticks to the targets. - targetPacks := make(map[string]*MsgPack) - for vchannel := range d.targets { - targetPacks[vchannel] = &MsgPack{ - BeginTs: pack.BeginTs, - EndTs: pack.EndTs, - Msgs: make([]msgstream.TsMsg, 0), - StartPositions: pack.StartPositions, - EndPositions: pack.EndPositions, - } - } - - // group messages by vchannel - for _, msg := range pack.Msgs { - if msg.VChannel() == "" { - // for non-dml msg, such as CreateCollection, DropCollection, ... - // we need to dispatch it to all the vchannels. - for k := range targetPacks { - targetPacks[k].Msgs = append(targetPacks[k].Msgs, msg) - } - continue - } - if _, ok := targetPacks[msg.VChannel()]; !ok { - continue - } - targetPacks[msg.VChannel()].Msgs = append(targetPacks[msg.VChannel()].Msgs, msg) - } - - // dispatch messages, split target if block + targetPacks := d.groupingMsgs(pack) for vchannel, p := range targetPacks { - t := d.targets[vchannel] - if err := t.send(p); err != nil { + var err error + var t = d.targets[vchannel] + if d.isMain { + // for main dispatcher, split target if err occurs + err = t.send(p) + } else { + // for solo dispatcher, only 1 target exists, we should + // keep retrying if err occurs, unless it paused or terminated. + for { + err = t.send(p) + if err == nil || !funcutil.CheckCtxValid(d.ctx) { + break + } + } + } + if err != nil { t.pos = pack.StartPositions[0] d.lagTargets.LoadOrStore(t.vchannel, t) d.nonBlockingNotify() @@ -236,6 +229,43 @@ func (d *Dispatcher) work() { } } +func (d *Dispatcher) groupingMsgs(pack *MsgPack) map[string]*MsgPack { + // init packs for all targets, even though there's no msg in pack, + // but we still need to dispatch time ticks to the targets. + targetPacks := make(map[string]*MsgPack) + for vchannel := range d.targets { + targetPacks[vchannel] = &MsgPack{ + BeginTs: pack.BeginTs, + EndTs: pack.EndTs, + Msgs: make([]msgstream.TsMsg, 0), + StartPositions: pack.StartPositions, + EndPositions: pack.EndPositions, + } + } + // group messages by vchannel + for _, msg := range pack.Msgs { + var vchannel string + switch msg.Type() { + case commonpb.MsgType_Insert: + vchannel = msg.(*msgstream.InsertMsg).GetShardName() + case commonpb.MsgType_Delete: + vchannel = msg.(*msgstream.DeleteMsg).GetShardName() + } + if vchannel == "" { + // for non-dml msg, such as CreateCollection, DropCollection, ... + // we need to dispatch it to all the vchannels. + for k := range targetPacks { + targetPacks[k].Msgs = append(targetPacks[k].Msgs, msg) + } + continue + } + if _, ok := targetPacks[vchannel]; ok { + targetPacks[vchannel].Msgs = append(targetPacks[vchannel].Msgs, msg) + } + } + return targetPacks +} + func (d *Dispatcher) nonBlockingNotify() { select { case d.lagNotifyChan <- struct{}{}: diff --git a/internal/mq/msgdispatcher/manager.go b/internal/mq/msgdispatcher/manager.go index 5e3125b66e..78cd974440 100644 --- a/internal/mq/msgdispatcher/manager.go +++ b/internal/mq/msgdispatcher/manager.go @@ -25,9 +25,13 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -125,6 +129,7 @@ func (c *dispatcherManager) Remove(vchannel string) { c.soloDispatchers[vchannel].Handle(terminate) c.soloDispatchers[vchannel].CloseTarget(vchannel) delete(c.soloDispatchers, vchannel) + c.deleteMetric(vchannel) log.Info("remove soloDispatcher done") } c.lagTargets.Delete(vchannel) @@ -150,14 +155,18 @@ func (c *dispatcherManager) Run() { log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel)) log.Info("dispatcherManager is running...") - ticker := time.NewTicker(CheckPeriod) - defer ticker.Stop() + ticker1 := time.NewTicker(10 * time.Second) + ticker2 := time.NewTicker(CheckPeriod) + defer ticker1.Stop() + defer ticker2.Stop() for { select { case <-c.closeChan: log.Info("dispatcherManager exited") return - case <-ticker.C: + case <-ticker1.C: + c.uploadMetric() + case <-ticker2.C: c.tryMerge() case <-c.lagNotifyChan: c.mu.Lock() @@ -206,6 +215,7 @@ func (c *dispatcherManager) tryMerge() { } c.soloDispatchers[vchannel].Handle(terminate) delete(c.soloDispatchers, vchannel) + c.deleteMetric(vchannel) } c.mainDispatcher.Handle(resume) log.Info("merge done", zap.Any("vchannel", candidates)) @@ -220,6 +230,7 @@ func (c *dispatcherManager) split(t *target) { if _, ok := c.soloDispatchers[t.vchannel]; ok { c.soloDispatchers[t.vchannel].Handle(terminate) delete(c.soloDispatchers, t.vchannel) + c.deleteMetric(t.vchannel) } var newSolo *Dispatcher @@ -238,3 +249,42 @@ func (c *dispatcherManager) split(t *target) { newSolo.Handle(start) log.Info("split done") } + +// deleteMetric remove specific prometheus metric, +// Lock/RLock is required before calling this method. +func (c *dispatcherManager) deleteMetric(channel string) { + nodeIDStr := fmt.Sprintf("%d", c.nodeID) + if c.role == typeutil.DataNodeRole { + metrics.DataNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel) + return + } + if c.role == typeutil.QueryNodeRole { + metrics.QueryNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel) + } +} + +func (c *dispatcherManager) uploadMetric() { + c.mu.RLock() + defer c.mu.RUnlock() + nodeIDStr := fmt.Sprintf("%d", c.nodeID) + fn := func(gauge *prometheus.GaugeVec) { + if c.mainDispatcher == nil { + return + } + // for main dispatcher, use pchannel as channel label + gauge.WithLabelValues(nodeIDStr, c.pchannel).Set( + float64(time.Since(tsoutil.PhysicalTime(c.mainDispatcher.CurTs())).Milliseconds())) + // for solo dispatchers, use vchannel as channel label + for vchannel, dispatcher := range c.soloDispatchers { + gauge.WithLabelValues(nodeIDStr, vchannel).Set( + float64(time.Since(tsoutil.PhysicalTime(dispatcher.CurTs())).Milliseconds())) + } + } + if c.role == typeutil.DataNodeRole { + fn(metrics.DataNodeMsgDispatcherTtLag) + return + } + if c.role == typeutil.QueryNodeRole { + fn(metrics.QueryNodeMsgDispatcherTtLag) + } +} diff --git a/internal/mq/msgdispatcher/manager_test.go b/internal/mq/msgdispatcher/manager_test.go index 271454a6fe..ec95712422 100644 --- a/internal/mq/msgdispatcher/manager_test.go +++ b/internal/mq/msgdispatcher/manager_test.go @@ -45,12 +45,14 @@ func TestManager(t *testing.T) { r := rand.Intn(10) + 1 for j := 0; j < r; j++ { offset++ - _, err := c.Add(fmt.Sprintf("mock_vchannel_%d", offset), nil, mqwrapper.SubscriptionPositionUnknown) + t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) + _, err := c.Add(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, offset, c.Num()) } for j := 0; j < rand.Intn(r); j++ { - c.Remove(fmt.Sprintf("mock_vchannel_%d", offset)) + t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) + c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) offset-- assert.Equal(t, offset, c.Num()) } @@ -93,8 +95,9 @@ func TestManager(t *testing.T) { CheckPeriod = 10 * time.Millisecond go c.Run() - time.Sleep(15 * time.Millisecond) - assert.Equal(t, 1, c.Num()) // expected merged + assert.Eventually(t, func() bool { + return c.Num() == 1 // expected merged + }, 300*time.Millisecond, 10*time.Millisecond) assert.NotPanics(t, func() { c.Close() @@ -140,14 +143,13 @@ func (suite *SimulationSuite) SetupTest() { suite.producer = producer suite.manager = NewDispatcherManager(suite.pchannel, typeutil.DataNodeRole, 0, suite.factory) - CheckPeriod = 10 * time.Millisecond go suite.manager.Run() } func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) { defer wg.Done() - const timeTickCount = 200 + const timeTickCount = 100 var uniqueMsgID int64 vchannelKeys := reflect.ValueOf(suite.vchannels).MapKeys() @@ -205,7 +207,7 @@ func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup select { case <-ctx.Done(): return - case <-time.After(2000 * time.Millisecond): // no message to consume + case <-time.After(5000 * time.Millisecond): // no message to consume return case pack := <-suite.vchannels[vchannel].output: assert.Greater(suite.T(), pack.EndTs, lastTs) @@ -245,7 +247,7 @@ func (suite *SimulationSuite) produceTimeTickOnly(ctx context.Context) { } func (suite *SimulationSuite) TestDispatchToVchannels() { - const vchannelNum = 20 + const vchannelNum = 10 suite.vchannels = make(map[string]*vchannelHelper, vchannelNum) for i := 0; i < vchannelNum; i++ { vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i) @@ -275,9 +277,9 @@ func (suite *SimulationSuite) TestMerge() { ctx, cancel := context.WithCancel(context.Background()) go suite.produceTimeTickOnly(ctx) - const vchannelNum = 20 + const vchannelNum = 10 suite.vchannels = make(map[string]*vchannelHelper, vchannelNum) - positions, err := getSeekPositions(suite.factory, suite.pchannel, 200) + positions, err := getSeekPositions(suite.factory, suite.pchannel, 100) assert.NoError(suite.T(), err) for i := 0; i < vchannelNum; i++ { @@ -306,27 +308,20 @@ func (suite *SimulationSuite) TestSplit() { ctx, cancel := context.WithCancel(context.Background()) go suite.produceTimeTickOnly(ctx) - const vchannelNum = 10 + const ( + vchannelNum = 10 + splitNum = 3 + ) suite.vchannels = make(map[string]*vchannelHelper, vchannelNum) - DefaultTargetChanSize = 10 MaxTolerantLag = 500 * time.Millisecond + DefaultTargetChanSize = 65536 for i := 0; i < vchannelNum; i++ { - vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i) - output, err := suite.manager.Add(vchannel, nil, mqwrapper.SubscriptionPositionEarliest) - assert.NoError(suite.T(), err) - suite.vchannels[vchannel] = &vchannelHelper{output: output} - } - - const splitNum = 3 - wg := &sync.WaitGroup{} - counter := 0 - for vchannel := range suite.vchannels { - wg.Add(1) - go suite.consumeMsg(ctx, wg, vchannel) - counter++ - if counter >= len(suite.vchannels)-splitNum { - break + if i >= vchannelNum-splitNum { + DefaultTargetChanSize = 10 } + vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i) + _, err := suite.manager.Add(vchannel, nil, mqwrapper.SubscriptionPositionEarliest) + assert.NoError(suite.T(), err) } suite.Eventually(func() bool { @@ -335,7 +330,6 @@ func (suite *SimulationSuite) TestSplit() { }, 10*time.Second, 100*time.Millisecond) cancel() - wg.Wait() } func (suite *SimulationSuite) TearDownTest() { diff --git a/internal/mq/msgstream/msg.go b/internal/mq/msgstream/msg.go index 8906d39ea4..4a0c11ce2b 100644 --- a/internal/mq/msgstream/msg.go +++ b/internal/mq/msgstream/msg.go @@ -53,7 +53,6 @@ type TsMsg interface { Unmarshal(MarshalType) (TsMsg, error) Position() *MsgPosition SetPosition(*MsgPosition) - VChannel() string } // BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream @@ -63,7 +62,6 @@ type BaseMsg struct { EndTimestamp Timestamp HashValues []uint32 MsgPosition *MsgPosition - Vchannel string } // TraceCtx returns the context of opentracing @@ -101,10 +99,6 @@ func (bm *BaseMsg) SetPosition(position *MsgPosition) { bm.MsgPosition = position } -func (bm *BaseMsg) VChannel() string { - return bm.Vchannel -} - func convertToByteArray(input interface{}) ([]byte, error) { switch output := input.(type) { case []byte: @@ -176,7 +170,6 @@ func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) { insertMsg.BeginTimestamp = timestamp } } - insertMsg.Vchannel = insertMsg.ShardName return insertMsg, nil } @@ -285,7 +278,6 @@ func (it *InsertMsg) IndexMsg(index int) *InsertMsg { Ctx: it.TraceCtx(), BeginTimestamp: it.BeginTimestamp, EndTimestamp: it.EndTimestamp, - Vchannel: it.Vchannel, HashValues: it.HashValues, MsgPosition: it.MsgPosition, }, @@ -369,7 +361,6 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { deleteMsg.BeginTimestamp = timestamp } } - deleteMsg.Vchannel = deleteRequest.ShardName return deleteMsg, nil } diff --git a/internal/proxy/msg_pack.go b/internal/proxy/msg_pack.go index c0d5de4b34..b365fe68b5 100644 --- a/internal/proxy/msg_pack.go +++ b/internal/proxy/msg_pack.go @@ -148,7 +148,6 @@ func assignSegmentID(ctx context.Context, insertMsg *msgstream.InsertMsg, result msg.HashValues = append(msg.HashValues, insertMsg.HashValues[offset]) msg.Timestamps = append(msg.Timestamps, insertMsg.Timestamps[offset]) msg.RowIDs = append(msg.RowIDs, insertMsg.RowIDs[offset]) - msg.BaseMsg.Vchannel = channelName msg.NumRows++ requestSize += curRowMessageSize } diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 1edaa7c911..352da65868 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -299,7 +299,6 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { typeutil.AppendIDs(curMsg.PrimaryKeys, dt.deleteMsg.PrimaryKeys, index) curMsg.NumRows++ curMsg.ShardName = vchannel - curMsg.Vchannel = vchannel } // send delete request to log broker diff --git a/internal/util/flowgraph/message_test.go b/internal/util/flowgraph/message_test.go index 9292b6f735..8dc3269eff 100644 --- a/internal/util/flowgraph/message_test.go +++ b/internal/util/flowgraph/message_test.go @@ -76,10 +76,6 @@ func (bm *MockMsg) SetPosition(position *MsgPosition) { } -func (bm *MockMsg) VChannel() string { - return "" -} - func Test_GenerateMsgStreamMsg(t *testing.T) { messages := make([]msgstream.TsMsg, 1) messages[0] = &MockMsg{ diff --git a/internal/util/retry/retry.go b/internal/util/retry/retry.go index 1aea74d3a6..81cf8b3ace 100644 --- a/internal/util/retry/retry.go +++ b/internal/util/retry/retry.go @@ -36,7 +36,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { for i := uint(0); i < c.attempts; i++ { if err := fn(); err != nil { if i%10 == 0 { - log.Debug("retry func failed", zap.Uint("retry time", i), zap.Error(err)) + log.Error("retry func failed", zap.Uint("retry time", i), zap.Error(err)) } el = append(el, err)