Treat small segment without index as sealed (#25237)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-07-02 19:50:23 +08:00 committed by GitHub
parent d3eacb2563
commit 597a4d9227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 25 deletions

View File

@ -123,6 +123,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
indexedIDs = make(typeutil.UniqueSet) indexedIDs = make(typeutil.UniqueSet)
unIndexedIDs = make(typeutil.UniqueSet) unIndexedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet) droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
) )
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID }) validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
@ -137,11 +138,16 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
continue continue
} }
segmentInfos[s.GetID()] = s segmentInfos[s.GetID()] = s
if s.GetState() == commonpb.SegmentState_Dropped { switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID()) droppedIDs.Insert(s.GetID())
} else if indexed.Contain(s.GetID()) { case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
indexedIDs.Insert(s.GetID()) indexedIDs.Insert(s.GetID())
} else { case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
indexedIDs.Insert(s.GetID())
default:
unIndexedIDs.Insert(s.GetID()) unIndexedIDs.Insert(s.GetID())
} }
} }
@ -193,6 +199,16 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
for retrieveUnIndexed() { for retrieveUnIndexed() {
} }
for segId := range unIndexedIDs {
segInfo := segmentInfos[segId]
if segInfo.GetState() == commonpb.SegmentState_Dropped {
unIndexedIDs.Remove(segId)
indexedIDs.Insert(segId)
}
}
unIndexedIDs.Insert(growingIDs.Collect()...)
return &datapb.VchannelInfo{ return &datapb.VchannelInfo{
CollectionID: channel.CollectionID, CollectionID: channel.CollectionID,
ChannelName: channel.Name, ChannelName: channel.Name,

View File

@ -2039,6 +2039,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
MsgGroup: "", MsgGroup: "",
Timestamp: 0, Timestamp: 0,
}, },
NumOfRows: 2048,
} }
err = svr.meta.AddSegment(NewSegmentInfo(s1)) err = svr.meta.AddSegment(NewSegmentInfo(s1))
assert.NoError(t, err) assert.NoError(t, err)
@ -2119,9 +2120,8 @@ func TestGetQueryVChanPositions(t *testing.T) {
t.Run("get existed channel", func(t *testing.T) { t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) assert.ElementsMatch(t, []int64{1}, vchan.FlushedSegmentIds)
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{s2.ID, s3.ID}, vchan.UnflushedSegmentIds)
}) })
t.Run("empty collection", func(t *testing.T) { t.Run("empty collection", func(t *testing.T) {
@ -2211,14 +2211,15 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Timestamp: 1, Timestamp: 1,
}, },
CompactionFrom: []int64{1, 2}, // c, d CompactionFrom: []int64{1, 2}, // c, d
NumOfRows: 2048,
} }
err = svr.meta.AddSegment(NewSegmentInfo(e)) err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.NoError(t, err) assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
}) })
t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) { t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) {
@ -2296,14 +2297,15 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Timestamp: 1, Timestamp: 1,
}, },
CompactionFrom: []int64{1, 2}, // c, d CompactionFrom: []int64{1, 2}, // c, d
NumOfRows: 2048,
} }
err = svr.meta.AddSegment(NewSegmentInfo(e)) err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.NoError(t, err) assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
}) })
t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) { t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) {
@ -2376,6 +2378,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Timestamp: 1, Timestamp: 1,
}, },
CompactionFrom: []int64{1, 2}, // c, d CompactionFrom: []int64{1, 2}, // c, d
NumOfRows: 2048,
} }
err = svr.meta.AddSegment(NewSegmentInfo(e)) err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.NoError(t, err) assert.NoError(t, err)
@ -2919,12 +2922,12 @@ func TestGetRecoveryInfo(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) seg1 := createSegment(9, 0, 0, 2048, 30, "vchan1", commonpb.SegmentState_Dropped)
seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg2 := createSegment(10, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg3 := createSegment(11, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3.CompactionFrom = []int64{9, 10} seg3.CompactionFrom = []int64{9, 10}
seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg4 := createSegment(12, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) seg5 := createSegment(13, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Flushed)
seg5.CompactionFrom = []int64{11, 12} seg5.CompactionFrom = []int64{11, 12}
err = svr.meta.AddSegment(NewSegmentInfo(seg1)) err = svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.NoError(t, err) assert.NoError(t, err)
@ -2977,8 +2980,8 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NotNil(t, resp.GetChannels()[0].SeekPosition) assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0)
assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) assert.ElementsMatch(t, []UniqueID{}, resp.GetChannels()[0].GetUnflushedSegmentIds())
assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) assert.ElementsMatch(t, []UniqueID{9, 10, 12}, resp.GetChannels()[0].GetFlushedSegmentIds())
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {

View File

@ -515,12 +515,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) seg1 := createSegment(9, 0, 0, 2048, 30, "vchan1", commonpb.SegmentState_Dropped)
seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg2 := createSegment(10, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg3 := createSegment(11, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3.CompactionFrom = []int64{9, 10} seg3.CompactionFrom = []int64{9, 10}
seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) seg4 := createSegment(12, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped)
seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) seg5 := createSegment(13, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Flushed)
seg5.CompactionFrom = []int64{11, 12} seg5.CompactionFrom = []int64{11, 12}
err = svr.meta.AddSegment(NewSegmentInfo(seg1)) err = svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.NoError(t, err) assert.NoError(t, err)
@ -572,8 +572,8 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NotNil(t, resp.GetChannels()[0].SeekPosition) assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0)
assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) assert.ElementsMatch(t, []UniqueID{}, resp.GetChannels()[0].GetUnflushedSegmentIds())
assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) assert.ElementsMatch(t, []UniqueID{9, 10, 12}, resp.GetChannels()[0].GetFlushedSegmentIds())
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {

View File

@ -134,6 +134,9 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
indexedSegments := make([]*SegmentInfo, 0) indexedSegments := make([]*SegmentInfo, 0)
for _, segment := range segments { for _, segment := range segments {
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue
}
segmentState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), vecFieldID[segment.GetCollectionID()]) segmentState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), vecFieldID[segment.GetCollectionID()])
if segmentState.state == commonpb.IndexState_Finished { if segmentState.state == commonpb.IndexState_Finished {
indexedSegments = append(indexedSegments, segment) indexedSegments = append(indexedSegments, segment)