From d997dd20af65a8998725b68c14840f0249625ead Mon Sep 17 00:00:00 2001 From: sunby Date: Tue, 15 Jun 2021 19:23:55 +0800 Subject: [PATCH] Modify GetRecoveryInfo logic (#5785) To satisfy QueryNode's request, GetRecoveryInfo interface should return unflushed segments with start position. Because of using the same code for getting seek position in QueryNode and DataNode before, we add a flag to differentiate. Signed-off-by: sunby --- internal/dataservice/binlog_helper.go | 8 ++++++-- internal/dataservice/cluster.go | 2 +- internal/dataservice/datanode_helper.go | 4 ++-- internal/dataservice/grpc_services.go | 12 ++++++++++- internal/dataservice/server_test.go | 27 ++++++++++--------------- 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/internal/dataservice/binlog_helper.go b/internal/dataservice/binlog_helper.go index 4e0762aec0..7708a0203e 100644 --- a/internal/dataservice/binlog_helper.go +++ b/internal/dataservice/binlog_helper.go @@ -128,7 +128,7 @@ func (s *Server) getSegmentBinlogMeta(segmentID UniqueID) (metas []*datapb.Segme } // GetVChanPositions get vchannel latest postitions with provided dml channel names -func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error) { +func (s *Server) GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datapb.VchannelInfo, error) { if s.kvClient == nil { return nil, errNilKvClient } @@ -157,7 +157,11 @@ func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, e if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp { useUnflushedPosition = true - seekPosition = s.DmlPosition + if isAccurate { + seekPosition = s.DmlPosition + } else { + seekPosition = s.StartPosition + } } } diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index ca922b3dde..de73cade05 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -130,7 +130,7 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo { } log.Debug(logMsg) - vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes) + vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true) if err != nil { log.Warn("get vchannel position failed", zap.Error(err)) continue diff --git a/internal/dataservice/datanode_helper.go b/internal/dataservice/datanode_helper.go index 48e939b4f3..118b5c7378 100644 --- a/internal/dataservice/datanode_helper.go +++ b/internal/dataservice/datanode_helper.go @@ -22,14 +22,14 @@ type vchannel struct { // positionProvider provides vchannel pair related position pairs type positionProvider interface { - GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error) + GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datapb.VchannelInfo, error) GetDdlChannel() string } type dummyPosProvider struct{} //GetVChanPositions implements positionProvider -func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error) { +func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datapb.VchannelInfo, error) { pairs := make([]*datapb.VchannelInfo, len(vchans)) for _, vchan := range vchans { pairs = append(pairs, &datapb.VchannelInfo{ diff --git a/internal/dataservice/grpc_services.go b/internal/dataservice/grpc_services.go index b7bfe06c13..2ab262ded0 100644 --- a/internal/dataservice/grpc_services.go +++ b/internal/dataservice/grpc_services.go @@ -395,6 +395,16 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf segmentIDs := s.meta.GetSegmentsOfPartition(collectionID, partitionID) segment2Binlogs := make(map[UniqueID][]*datapb.FieldBinlog) for _, id := range segmentIDs { + segment, err := s.meta.GetSegment(id) + if err != nil { + log.Error("Get segment failed", zap.Int64("segmentID", id)) + resp.Status.Reason = err.Error() + return resp, nil + } + if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing { + continue + } + meta, err := s.getSegmentBinlogMeta(id) if err != nil { log.Error("Get segment binlog meta failed", zap.Int64("segmentID", id)) @@ -449,7 +459,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf }) } - channelInfos, err := s.GetVChanPositions(vchans) + channelInfos, err := s.GetVChanPositions(vchans, false) if err != nil { log.Error("Get channel positions failed", zap.Strings("channels", channels), diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index bb76f1410c..d5fbb58d54 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -689,7 +689,7 @@ func TestGetVChannelPos(t *testing.T) { CollectionID: 0, DmlChannel: "chx1", }, - }) + }, true) assert.Nil(t, err) assert.EqualValues(t, 1, len(pair)) assert.Empty(t, pair[0].UnflushedSegments) @@ -702,7 +702,7 @@ func TestGetVChannelPos(t *testing.T) { CollectionID: 0, DmlChannel: "ch1", }, - }) + }, true) assert.Nil(t, err) assert.EqualValues(t, 1, len(pair)) assert.EqualValues(t, 0, pair[0].CollectionID) @@ -749,6 +749,12 @@ func TestGetRecoveryInfo(t *testing.T) { MsgID: []byte{}, Timestamp: posTs, }, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "", + MsgID: []byte{}, + MsgGroup: "", + Timestamp: 0, + }, } } @@ -773,16 +779,13 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) - t.Run("test get smallest position of unflushed segments as seek position", func(t *testing.T) { + t.Run("test get recovery of unflushed segments ", func(t *testing.T) { seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing) err := svr.meta.AddSegment(seg1) assert.Nil(t, err) err = svr.meta.AddSegment(seg2) assert.Nil(t, err) - expectedCps := make(map[UniqueID]*datapb.SegmentInfo) - expectedCps[3] = seg1 - expectedCps[4] = seg2 req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, @@ -791,17 +794,9 @@ func TestGetRecoveryInfo(t *testing.T) { resp, err := svr.GetRecoveryInfo(context.TODO(), req) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 0, len(resp.GetBinlogs())) assert.EqualValues(t, 1, len(resp.GetChannels())) - assert.EqualValues(t, 2, len(resp.GetChannels()[0].GetUnflushedSegments())) - assert.ElementsMatch(t, []UniqueID{0, 1}, resp.GetChannels()[0].GetFlushedSegments()) - assert.EqualValues(t, 30, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) - cps := resp.GetChannels()[0].GetUnflushedSegments() - for _, cp := range cps { - seg, ok := expectedCps[cp.GetID()] - assert.True(t, ok) - assert.EqualValues(t, seg.GetDmlPosition().GetTimestamp(), cp.GetDmlPosition().GetTimestamp()) - assert.EqualValues(t, seg.GetNumOfRows(), cp.GetNumOfRows()) - } + assert.NotNil(t, resp.GetChannels()[0].SeekPosition) }) t.Run("test get binlogs", func(t *testing.T) {