From feb5db60f2d7d6a7a222cbfc35967d2fe23ae3ca Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Sun, 27 Jul 2025 23:14:55 +0800 Subject: [PATCH] fix: make flush save binlog paths idempotent (#43579) issue: #43574 Signed-off-by: chyezh --- internal/datacoord/server_test.go | 9 ++++++--- internal/datacoord/services.go | 5 +++++ internal/datacoord/services_test.go | 10 +++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 56a6d738d0..256e2ec3b5 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1385,7 +1385,7 @@ func TestGetRecoveryInfo(t *testing.T) { }) binlogReq := &datapb.SaveBinlogPathsRequest{ - SegmentID: 0, + SegmentID: 10089, CollectionID: 0, Field2BinlogPaths: []*datapb.FieldBinlog{ { @@ -1425,8 +1425,9 @@ func TestGetRecoveryInfo(t *testing.T) { }, }, }, + Flushed: true, } - segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed) + segment := createSegment(binlogReq.SegmentID, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Growing) err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) @@ -1447,6 +1448,8 @@ func TestGetRecoveryInfo(t *testing.T) { State: commonpb.IndexState_Finished, }) assert.NoError(t, err) + paramtable.Get().Save(Params.DataCoordCfg.EnableSortCompaction.Key, "false") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableSortCompaction.Key) sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq) assert.NoError(t, err) @@ -1460,7 +1463,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.EqualValues(t, 1, len(resp.GetBinlogs())) - assert.EqualValues(t, 0, resp.GetBinlogs()[0].GetSegmentID()) + assert.EqualValues(t, binlogReq.SegmentID, resp.GetBinlogs()[0].GetSegmentID()) assert.EqualValues(t, 1, len(resp.GetBinlogs()[0].GetFieldBinlogs())) assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID()) for _, binlog := range resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs() { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 6205d8eeda..8606c28b41 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -569,6 +569,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return merr.Status(err), nil } + if segment.State == commonpb.SegmentState_Flushed && !req.Dropped { + log.Info("save to flushed segment, ignore this request") + return merr.Success(), nil + } + if segment.State == commonpb.SegmentState_Dropped { log.Info("save to dropped segment, ignore this request") return merr.Success(), nil diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 6c243702b5..54e861c0da 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1030,7 +1030,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { }) binlogReq := &datapb.SaveBinlogPathsRequest{ - SegmentID: 0, + SegmentID: 10087, CollectionID: 0, Field2BinlogPaths: []*datapb.FieldBinlog{ { @@ -1071,8 +1071,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { }, }, }, + Flushed: true, } - segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed) + segment := createSegment(binlogReq.SegmentID, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Growing) err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) @@ -1099,6 +1100,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.channelManager.Watch(context.Background(), &channelMeta{Name: "vchan1", CollectionID: 0}) assert.NoError(t, err) + paramtable.Get().Save(Params.DataCoordCfg.EnableSortCompaction.Key, "false") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableSortCompaction.Key) + sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq) assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, sResp.ErrorCode) @@ -1112,7 +1116,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NoError(t, merr.Error(resp.Status)) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.EqualValues(t, 1, len(resp.GetSegments())) - assert.EqualValues(t, 0, resp.GetSegments()[0].GetID()) + assert.EqualValues(t, binlogReq.SegmentID, resp.GetSegments()[0].GetID()) assert.EqualValues(t, 0, len(resp.GetSegments()[0].GetBinlogs())) }) t.Run("with dropped segments", func(t *testing.T) {