mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Set an empty segment if compaction deleted all inserts (#36045)
See also: #36038 pr: #36044 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
b34b035edc
commit
6dc7d2041f
@ -221,6 +221,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
|
||||
s.task.currentTs = currTs
|
||||
s.task.plan.CollectionTtl = int64(collTTL)
|
||||
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil)
|
||||
s.mockAlloc.EXPECT().AllocOne().Return(19531, nil)
|
||||
|
||||
kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter)
|
||||
s.Require().NoError(err)
|
||||
@ -237,7 +238,12 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
|
||||
|
||||
compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil)
|
||||
s.NoError(err)
|
||||
s.Equal(0, len(compactionSegments))
|
||||
s.Equal(1, len(compactionSegments))
|
||||
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
|
||||
s.EqualValues(19531, compactionSegments[0].GetSegmentID())
|
||||
s.Empty(compactionSegments[0].GetDeltalogs())
|
||||
s.Empty(compactionSegments[0].GetInsertLogs())
|
||||
s.Empty(compactionSegments[0].GetField2StatslogPaths())
|
||||
}
|
||||
|
||||
func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
||||
@ -247,10 +253,11 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
||||
description string
|
||||
deletions map[interface{}]uint64
|
||||
expectedRes int
|
||||
leftNumRows int
|
||||
}{
|
||||
{"no deletion", nil, 1},
|
||||
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1},
|
||||
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0},
|
||||
{"no deletion", nil, 1, 1},
|
||||
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1},
|
||||
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0},
|
||||
}
|
||||
|
||||
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil)
|
||||
@ -258,7 +265,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
||||
s.Require().NoError(err)
|
||||
for _, test := range tests {
|
||||
s.Run(test.description, func() {
|
||||
if test.expectedRes > 0 {
|
||||
if test.leftNumRows > 0 {
|
||||
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(77777, 99999, nil).Once()
|
||||
}
|
||||
s.mockAlloc.EXPECT().AllocOne().Return(888888, nil).Maybe()
|
||||
@ -275,9 +282,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
||||
res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions)
|
||||
s.NoError(err)
|
||||
s.EqualValues(test.expectedRes, len(res))
|
||||
if test.expectedRes > 0 {
|
||||
s.EqualValues(1, res[0].GetNumOfRows())
|
||||
}
|
||||
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +47,7 @@ type MultiSegmentWriter struct {
|
||||
// segID -> fieldID -> binlogs
|
||||
|
||||
res []*datapb.CompactionSegment
|
||||
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
|
||||
}
|
||||
|
||||
func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator allocator.Allocator, plan *datapb.CompactionPlan, maxRows int64, partitionID, collectionID int64) *MultiSegmentWriter {
|
||||
@ -175,9 +176,27 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {
|
||||
return writer.Write(v)
|
||||
}
|
||||
|
||||
// Could return an empty list if every insert of the segment is deleted
|
||||
func (w *MultiSegmentWriter) appendEmptySegment() error {
|
||||
writer, err := w.getWriter()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.res = append(w.res, &datapb.CompactionSegment{
|
||||
SegmentID: writer.GetSegmentID(),
|
||||
NumOfRows: 0,
|
||||
Channel: w.channel,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// DONOT return an empty list if every insert of the segment is deleted,
|
||||
// append an empty segment instead
|
||||
func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
|
||||
if w.current == -1 {
|
||||
if err := w.appendEmptySegment(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w.res, nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user