From c7d6e3f19b36e4a9e461e1159b4e76ddc7bf115d Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 29 May 2025 17:32:29 +0800 Subject: [PATCH] fix: data lost when wal balance (#42149) issue: #42147 - error of sync task should be returned if error is returned to avoid checkpoint is push forward. - fix up node id checker of UpdateChannelCheckpoint in streaming. Signed-off-by: chyezh --- internal/datacoord/services.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e0a1aedbe0..bdd5016d8b 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1490,7 +1490,14 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update // For compatibility with old client if req.GetVChannel() != "" && req.GetPosition() != nil { channel := req.GetVChannel() - if !s.channelManager.Match(nodeID, channel) { + if streamingutil.IsStreamingServiceEnabled() { + targetID, err := snmanager.StaticStreamingNodeManager.GetLatestWALLocated(ctx, channel) + if err != nil || targetID != nodeID { + err := merr.WrapErrChannelNotFound(channel, fmt.Sprintf("for node %d", nodeID)) + log.Warn("failed to get latest wal allocated", zap.Error(err)) + return merr.Status(err), nil + } + } else if !s.channelManager.Match(nodeID, channel) { log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) return merr.Status(merr.WrapErrChannelNotFound(channel, fmt.Sprintf("from node %d", nodeID))), nil } @@ -1504,11 +1511,19 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update checkpoints := lo.Filter(req.GetChannelCheckpoints(), func(cp *msgpb.MsgPosition, _ int) bool { channel := cp.GetChannelName() - matched := s.channelManager.Match(nodeID, channel) - if !matched { + if streamingutil.IsStreamingServiceEnabled() { + targetID, err := snmanager.StaticStreamingNodeManager.GetLatestWALLocated(ctx, channel) + if err != nil || targetID != nodeID { + err := merr.WrapErrChannelNotFound(channel, fmt.Sprintf("for node %d", nodeID)) + log.Warn("failed to get latest wal allocated", zap.Error(err)) + return false + } + return true + } else if !s.channelManager.Match(nodeID, channel) { log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) + return false } - return matched + return true }) err := s.meta.UpdateChannelCheckpoints(ctx, checkpoints)