diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 7f10632edb..120480db41 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -53,7 +53,7 @@ func newServerHandler(s *Server) *ServerHandler { // GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode. func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { - return s.InsertChannel == channel.Name + return s.InsertChannel == channel.Name && !s.GetIsFake() }) log.Info("GetDataVChanPositions", zap.Int64("collectionID", channel.CollectionID), @@ -101,7 +101,7 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { // cannot use GetSegmentsByChannel since dropped segments are needed here segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { - return s.InsertChannel == channel.Name + return s.InsertChannel == channel.Name && !s.GetIsFake() }) segmentInfos := make(map[int64]*SegmentInfo) indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index d41b10603b..d37dc1ebfa 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2411,6 +2411,47 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0]) }) + t.Run("with fake segments", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + }) + require.NoError(t, err) + + seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) + seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + seg2.IsFake = true + err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + assert.Nil(t, err) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 0, len(resp.GetBinlogs())) + assert.EqualValues(t, 1, len(resp.GetChannels())) + assert.NotNil(t, resp.GetChannels()[0].SeekPosition) + assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + }) + t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index a289583a31..ab350f37d5 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -647,8 +647,8 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { continue } - // Also skip bulk insert segments. - if segment.GetIsImporting() { + // Also skip bulk insert & fake segments. + if segment.GetIsImporting() || segment.GetIsFake() { continue } segment2InsertChannel[segment.ID] = segment.InsertChannel