diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 49bb4d5c7d..599565eb16 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -248,7 +248,6 @@ type deleteRunner struct { ts uint64 lb LBPolicy count atomic.Int64 - err error // task queue queue *dmTaskQueue @@ -434,7 +433,11 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } taskCh := make(chan *deleteTask, 256) - go dr.receiveQueryResult(ctx, client, taskCh) + var receiveErr error + go func() { + receiveErr = dr.receiveQueryResult(ctx, client, taskCh) + close(taskCh) + }() var allQueryCnt int64 // wait all task finish for task := range taskCh { @@ -447,42 +450,35 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } // query or produce task failed - if dr.err != nil { - return dr.err + if receiveErr != nil { + return receiveErr } dr.allQueryCnt.Add(allQueryCnt) return nil } } -func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) { - defer func() { - close(taskCh) - }() - +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error { for { result, err := client.Recv() if err != nil { if err == io.EOF { log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID)) - return + return nil } - dr.err = err - return + return err } err = merr.Error(result.GetStatus()) if err != nil { - dr.err = err log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err)) - return + return err } task, err := dr.produce(ctx, result.GetIds()) if err != nil { - dr.err = err log.Warn("produce delete task failed", zap.Error(err)) - return + return err } task.allQueryCnt = result.GetAllRetrieveCount()