fix: make flush save binlog paths idempotent (#43579)

issue: #43574

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-27 23:14:55 +08:00 committed by GitHub
parent faeb7fd410
commit feb5db60f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 6 deletions

View File

@ -1385,7 +1385,7 @@ func TestGetRecoveryInfo(t *testing.T) {
})
binlogReq := &datapb.SaveBinlogPathsRequest{
SegmentID: 0,
SegmentID: 10089,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
@ -1425,8 +1425,9 @@ func TestGetRecoveryInfo(t *testing.T) {
},
},
},
Flushed: true,
}
segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
segment := createSegment(binlogReq.SegmentID, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Growing)
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
@ -1447,6 +1448,8 @@ func TestGetRecoveryInfo(t *testing.T) {
State: commonpb.IndexState_Finished,
})
assert.NoError(t, err)
paramtable.Get().Save(Params.DataCoordCfg.EnableSortCompaction.Key, "false")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableSortCompaction.Key)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
assert.NoError(t, err)
@ -1460,7 +1463,7 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.EqualValues(t, 1, len(resp.GetBinlogs()))
assert.EqualValues(t, 0, resp.GetBinlogs()[0].GetSegmentID())
assert.EqualValues(t, binlogReq.SegmentID, resp.GetBinlogs()[0].GetSegmentID())
assert.EqualValues(t, 1, len(resp.GetBinlogs()[0].GetFieldBinlogs()))
assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID())
for _, binlog := range resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs() {

View File

@ -569,6 +569,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return merr.Status(err), nil
}
if segment.State == commonpb.SegmentState_Flushed && !req.Dropped {
log.Info("save to flushed segment, ignore this request")
return merr.Success(), nil
}
if segment.State == commonpb.SegmentState_Dropped {
log.Info("save to dropped segment, ignore this request")
return merr.Success(), nil

View File

@ -1030,7 +1030,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
})
binlogReq := &datapb.SaveBinlogPathsRequest{
SegmentID: 0,
SegmentID: 10087,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
@ -1071,8 +1071,9 @@ func TestGetRecoveryInfoV2(t *testing.T) {
},
},
},
Flushed: true,
}
segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
segment := createSegment(binlogReq.SegmentID, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Growing)
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
@ -1099,6 +1100,9 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.channelManager.Watch(context.Background(), &channelMeta{Name: "vchan1", CollectionID: 0})
assert.NoError(t, err)
paramtable.Get().Save(Params.DataCoordCfg.EnableSortCompaction.Key, "false")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableSortCompaction.Key)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, sResp.ErrorCode)
@ -1112,7 +1116,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NoError(t, merr.Error(resp.Status))
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.EqualValues(t, 1, len(resp.GetSegments()))
assert.EqualValues(t, 0, resp.GetSegments()[0].GetID())
assert.EqualValues(t, binlogReq.SegmentID, resp.GetSegments()[0].GetID())
assert.EqualValues(t, 0, len(resp.GetSegments()[0].GetBinlogs()))
})
t.Run("with dropped segments", func(t *testing.T) {