From a78ea4fea0640a23d22fbd11ea6bb3047e1501da Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 29 Nov 2023 17:50:27 +0800 Subject: [PATCH] fix: Check ErrSegmentNotFound in delete node (#28371) (#28638) We have been check ErrSegmentNotFound in insert_buffer_node in datanode, we should also check it in delete_node. issue: https://github.com/milvus-io/milvus/issues/27145 pr: https://github.com/milvus-io/milvus/pull/28371 Signed-off-by: bigsheeper --- internal/datanode/flow_graph_delete_node.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index faccda3a53..6c629945ae 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" + "github.com/cockroachdb/errors" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -124,11 +125,24 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { // no related delta data to flush, send empty buf to complete flush life-cycle dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0]) } else { + segment := dn.channel.getSegment(segmentToFlush) // TODO, this has to be async, no need to block here err := retry.Do(dn.ctx, func() error { - return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0]) + err := dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0]) + if err != nil && errors.Is(err, merr.ErrSegmentNotFound) { + return retry.Unrecoverable(err) + } + return nil }, getFlowGraphRetryOpt()) if err != nil { + if errors.Is(err, merr.ErrSegmentNotFound) { + if !segment.isValid() { + log.Info("try to flush a compacted segment, ignore..", + zap.Int64("segmentID", segmentToFlush), + zap.Error(err)) + } + continue + } if merr.IsCanceledOrTimeout(err) { log.Warn("skip syncing delete data for context done", zap.Int64("segmentID", segmentToFlush)) continue