mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Filter fake segments for recovery info (#20493)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a82c235599
commit
c21497fbb4
@ -53,7 +53,7 @@ func newServerHandler(s *Server) *ServerHandler {
|
|||||||
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
|
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
|
||||||
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
|
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
|
||||||
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
|
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
|
||||||
return s.InsertChannel == channel.Name
|
return s.InsertChannel == channel.Name && !s.GetIsFake()
|
||||||
})
|
})
|
||||||
log.Info("GetDataVChanPositions",
|
log.Info("GetDataVChanPositions",
|
||||||
zap.Int64("collectionID", channel.CollectionID),
|
zap.Int64("collectionID", channel.CollectionID),
|
||||||
@ -101,7 +101,7 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
|
|||||||
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
|
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
|
||||||
// cannot use GetSegmentsByChannel since dropped segments are needed here
|
// cannot use GetSegmentsByChannel since dropped segments are needed here
|
||||||
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
|
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
|
||||||
return s.InsertChannel == channel.Name
|
return s.InsertChannel == channel.Name && !s.GetIsFake()
|
||||||
})
|
})
|
||||||
segmentInfos := make(map[int64]*SegmentInfo)
|
segmentInfos := make(map[int64]*SegmentInfo)
|
||||||
indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
|
indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
|
||||||
|
|||||||
@ -2411,6 +2411,47 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||||||
assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0])
|
assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("with fake segments", func(t *testing.T) {
|
||||||
|
svr := newTestServer(t, nil)
|
||||||
|
defer closeTestServer(t, svr)
|
||||||
|
|
||||||
|
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
|
||||||
|
return newMockRootCoordService(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
svr.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: 0,
|
||||||
|
Schema: newTestSchema(),
|
||||||
|
})
|
||||||
|
|
||||||
|
err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{
|
||||||
|
ChannelName: "vchan1",
|
||||||
|
Timestamp: 0,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing)
|
||||||
|
seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed)
|
||||||
|
seg2.IsFake = true
|
||||||
|
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
|
||||||
|
assert.Nil(t, err)
|
||||||
|
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
|
||||||
|
assert.Nil(t, err)
|
||||||
|
svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil)
|
||||||
|
|
||||||
|
req := &datapb.GetRecoveryInfoRequest{
|
||||||
|
CollectionID: 0,
|
||||||
|
PartitionID: 0,
|
||||||
|
}
|
||||||
|
resp, err := svr.GetRecoveryInfo(context.TODO(), req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
|
assert.EqualValues(t, 0, len(resp.GetBinlogs()))
|
||||||
|
assert.EqualValues(t, 1, len(resp.GetChannels()))
|
||||||
|
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
|
||||||
|
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("with closed server", func(t *testing.T) {
|
t.Run("with closed server", func(t *testing.T) {
|
||||||
svr := newTestServer(t, nil)
|
svr := newTestServer(t, nil)
|
||||||
closeTestServer(t, svr)
|
closeTestServer(t, svr)
|
||||||
|
|||||||
@ -647,8 +647,8 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||||||
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped {
|
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Also skip bulk insert segments.
|
// Also skip bulk insert & fake segments.
|
||||||
if segment.GetIsImporting() {
|
if segment.GetIsImporting() || segment.GetIsFake() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
segment2InsertChannel[segment.ID] = segment.InsertChannel
|
segment2InsertChannel[segment.ID] = segment.InsertChannel
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user