From 285c3f63e3cdfc0baf8d1d6956eb9e7f76f24c2c Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 22 Oct 2021 14:35:13 +0800 Subject: [PATCH] Trim segmentinfo binlog for VChaninfo usage (#10425) Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 20 ++++++++++++++++++-- internal/datacoord/server_test.go | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9afa0764b9..27eedda7ae 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -683,7 +683,7 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr var useUnflushedPosition bool for _, s := range segments { if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed { - flushed = append(flushed, s.SegmentInfo) + flushed = append(flushed, trimSegmentInfo(s.SegmentInfo)) if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) { seekPosition = s.DmlPosition } @@ -694,7 +694,7 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr continue } - unflushed = append(unflushed, s.SegmentInfo) + unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo)) if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp { useUnflushedPosition = true @@ -728,3 +728,19 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr UnflushedSegments: unflushed, } } + +// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil +func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: info.ID, + CollectionID: info.CollectionID, + PartitionID: info.PartitionID, + InsertChannel: info.InsertChannel, + NumOfRows: info.NumOfRows, + State: info.State, + MaxRowNum: info.MaxRowNum, + LastExpireTime: info.LastExpireTime, + StartPosition: info.StartPosition, + DmlPosition: info.DmlPosition, + } +} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 6dcc8322db..9472211b7f 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1196,7 +1196,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.EqualValues(t, 1, len(resp.GetChannels())) assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegments())) - assert.ElementsMatch(t, []*datapb.SegmentInfo{seg1, seg2}, resp.GetChannels()[0].GetFlushedSegments()) + assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments()) assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) })