mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: data lost when wal balance (#42149)
issue: #42147 - error of sync task should be returned if error is returned to avoid checkpoint is push forward. - fix up node id checker of UpdateChannelCheckpoint in streaming. Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
3a74044149
commit
c7d6e3f19b
@ -1490,7 +1490,14 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
|
|||||||
// For compatibility with old client
|
// For compatibility with old client
|
||||||
if req.GetVChannel() != "" && req.GetPosition() != nil {
|
if req.GetVChannel() != "" && req.GetPosition() != nil {
|
||||||
channel := req.GetVChannel()
|
channel := req.GetVChannel()
|
||||||
if !s.channelManager.Match(nodeID, channel) {
|
if streamingutil.IsStreamingServiceEnabled() {
|
||||||
|
targetID, err := snmanager.StaticStreamingNodeManager.GetLatestWALLocated(ctx, channel)
|
||||||
|
if err != nil || targetID != nodeID {
|
||||||
|
err := merr.WrapErrChannelNotFound(channel, fmt.Sprintf("for node %d", nodeID))
|
||||||
|
log.Warn("failed to get latest wal allocated", zap.Error(err))
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
|
} else if !s.channelManager.Match(nodeID, channel) {
|
||||||
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
||||||
return merr.Status(merr.WrapErrChannelNotFound(channel, fmt.Sprintf("from node %d", nodeID))), nil
|
return merr.Status(merr.WrapErrChannelNotFound(channel, fmt.Sprintf("from node %d", nodeID))), nil
|
||||||
}
|
}
|
||||||
@ -1504,11 +1511,19 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
|
|||||||
|
|
||||||
checkpoints := lo.Filter(req.GetChannelCheckpoints(), func(cp *msgpb.MsgPosition, _ int) bool {
|
checkpoints := lo.Filter(req.GetChannelCheckpoints(), func(cp *msgpb.MsgPosition, _ int) bool {
|
||||||
channel := cp.GetChannelName()
|
channel := cp.GetChannelName()
|
||||||
matched := s.channelManager.Match(nodeID, channel)
|
if streamingutil.IsStreamingServiceEnabled() {
|
||||||
if !matched {
|
targetID, err := snmanager.StaticStreamingNodeManager.GetLatestWALLocated(ctx, channel)
|
||||||
|
if err != nil || targetID != nodeID {
|
||||||
|
err := merr.WrapErrChannelNotFound(channel, fmt.Sprintf("for node %d", nodeID))
|
||||||
|
log.Warn("failed to get latest wal allocated", zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
} else if !s.channelManager.Match(nodeID, channel) {
|
||||||
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
return matched
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
err := s.meta.UpdateChannelCheckpoints(ctx, checkpoints)
|
err := s.meta.UpdateChannelCheckpoints(ctx, checkpoints)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user