From c22b0a636e3231f45793937d3ffee84f2427d9b3 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 5 Aug 2024 11:18:16 +0800 Subject: [PATCH] fix: delete by expr failed at retry progress (#35241) issue: #35240 delete by expr shard the same err object between channels, so if one channel's query failed, it will fail all channel, which will break channel level retry policy, and make delete operation failed. Signed-off-by: Wei Liu --- internal/proxy/task_delete.go | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 6813739621..4e2e202325 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -249,7 +249,6 @@ type deleteRunner struct { ts uint64 lb LBPolicy count atomic.Int64 - err error // task queue queue *dmTaskQueue @@ -441,7 +440,11 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } taskCh := make(chan *deleteTask, 256) - go dr.receiveQueryResult(ctx, client, taskCh, partitionIDs) + var receiveErr error + go func() { + receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs) + close(taskCh) + }() var allQueryCnt int64 // wait all task finish for task := range taskCh { @@ -454,51 +457,43 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } // query or produce task failed - if dr.err != nil { - return dr.err + if receiveErr != nil { + return receiveErr } dr.allQueryCnt.Add(allQueryCnt) return nil } } -func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) { - defer func() { - close(taskCh) - }() - +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error { for { result, err := client.Recv() if err != nil { if err == io.EOF { log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID)) - return + return nil } - dr.err = err - return + return err } err = merr.Error(result.GetStatus()) if err != nil { - dr.err = err log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err)) - return + return err } if dr.limiter != nil { err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds())) if err != nil { - dr.err = err log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err)) - return + return err } } task, err := dr.produce(ctx, result.GetIds()) if err != nil { - dr.err = err log.Warn("produce delete task failed", zap.Error(err)) - return + return err } task.allQueryCnt = result.GetAllRetrieveCount()