mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
We have been check ErrSegmentNotFound in insert_buffer_node in datanode, we should also check it in delete_node. issue: https://github.com/milvus-io/milvus/issues/27145 pr: https://github.com/milvus-io/milvus/pull/28371 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
a3aceb97a4
commit
a78ea4fea0
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -124,11 +125,24 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
||||
// no related delta data to flush, send empty buf to complete flush life-cycle
|
||||
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
|
||||
} else {
|
||||
segment := dn.channel.getSegment(segmentToFlush)
|
||||
// TODO, this has to be async, no need to block here
|
||||
err := retry.Do(dn.ctx, func() error {
|
||||
return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0])
|
||||
err := dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0])
|
||||
if err != nil && errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
return retry.Unrecoverable(err)
|
||||
}
|
||||
return nil
|
||||
}, getFlowGraphRetryOpt())
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
if !segment.isValid() {
|
||||
log.Info("try to flush a compacted segment, ignore..",
|
||||
zap.Int64("segmentID", segmentToFlush),
|
||||
zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
if merr.IsCanceledOrTimeout(err) {
|
||||
log.Warn("skip syncing delete data for context done", zap.Int64("segmentID", segmentToFlush))
|
||||
continue
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user