mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Fix Fix replication txn data loss during chaos (#44963)
Only confirm CommitMsg for txn messages to prevent data loss. issue: https://github.com/milvus-io/milvus/issues/44962, https://github.com/milvus-io/milvus/issues/44123 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
21076196bf
commit
0c61d969bb
@ -125,7 +125,7 @@ func (p *ReplicateStreamServer) handleReplicateMessage(req *milvuspb.ReplicateRe
|
||||
// Append message to wal.
|
||||
_, err := streaming.WAL().Replicate().Append(p.streamServer.Context(), msg)
|
||||
if err == nil {
|
||||
p.sendReplicateResult(sourceTs)
|
||||
p.sendReplicateResult(sourceTs, msg)
|
||||
return nil
|
||||
}
|
||||
if status.AsStreamingError(err).IsIgnoredOperation() {
|
||||
@ -138,7 +138,11 @@ func (p *ReplicateStreamServer) handleReplicateMessage(req *milvuspb.ReplicateRe
|
||||
}
|
||||
|
||||
// sendReplicateResult sends the replicate result to client.
|
||||
func (p *ReplicateStreamServer) sendReplicateResult(sourceTimeTick uint64) {
|
||||
func (p *ReplicateStreamServer) sendReplicateResult(sourceTimeTick uint64, msg message.ReplicateMutableMessage) {
|
||||
if msg.TxnContext() != nil && msg.MessageType() != message.MessageTypeCommitTxn {
|
||||
// Only confirm the commit message of a transaction.
|
||||
return
|
||||
}
|
||||
resp := &milvuspb.ReplicateResponse{
|
||||
Response: &milvuspb.ReplicateResponse_ReplicateConfirmedMessageInfo{
|
||||
ReplicateConfirmedMessageInfo: &milvuspb.ReplicateConfirmedMessageInfo{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user